From 7ad3ecc78ca88f393c70a1056bcb193d584245ea Mon Sep 17 00:00:00 2001 From: Cuong Nguyen <128072568+can-anyscale@users.noreply.github.com> Date: Mon, 28 Oct 2024 10:16:20 -0700 Subject: [PATCH] Revert "[serve] Remove ability to specify `route_prefix` at deployment level" (#48292) Reverts ray-project/ray#48223 Broke https://github.com/ray-project/ray/issues/48289 so revert to unblock the release. --- .../doc_code/image_classifier_example.py | 4 +- .../serve/doc_code/translator_example.py | 4 +- .../src/main/java/io/ray/serve/api/Serve.java | 34 ++--- .../ray/serve/api/ServeControllerClient.java | 25 ++-- .../src/main/java/io/ray/serve/dag/Graph.java | 63 +++++++++- .../io/ray/serve/deployment/Deployment.java | 36 +++++- .../serve/deployment/DeploymentCreator.java | 26 +++- .../CrossLanguageDeploymentTest.java | 10 +- .../serve/deployment/DeploymentGraphTest.java | 4 +- .../ray/serve/deployment/DeploymentTest.java | 4 +- .../serve/docdemo/ManagePythonDeployment.java | 2 +- .../io/ray/serve/repdemo/DeploymentDemo.java | 2 +- python/ray/serve/_private/api.py | 40 +++++- .../ray/serve/_private/application_state.py | 24 ++-- python/ray/serve/_private/client.py | 57 +++++++-- python/ray/serve/_private/controller.py | 24 +++- python/ray/serve/_private/deploy_utils.py | 2 +- .../_private/deployment_function_node.py | 11 ++ .../serve/_private/deployment_graph_build.py | 82 ++++++++++++- python/ray/serve/_private/utils.py | 22 ---- python/ray/serve/api.py | 60 +++++---- python/ray/serve/deployment.py | 109 +++++++++++++--- python/ray/serve/schema.py | 33 ++++- python/ray/serve/scripts.py | 16 ++- python/ray/serve/tests/BUILD | 2 + python/ray/serve/tests/test_api.py | 63 ++++++++-- python/ray/serve/tests/test_cli_2.py | 37 +++++- python/ray/serve/tests/test_deploy.py | 4 + python/ray/serve/tests/test_deploy_2.py | 19 +++ python/ray/serve/tests/test_deploy_app.py | 53 +++++++- .../tests/test_deployment_graph_ingress.py | 116 ++++++++++++++++++ .../ray/serve/tests/test_deployment_node.py | 82 +++++++++++++ python/ray/serve/tests/test_fastapi.py | 3 +- python/ray/serve/tests/test_http_routes.py | 45 +++---- python/ray/serve/tests/test_standalone.py | 42 ++++++- python/ray/serve/tests/test_websockets.py | 5 +- .../tests/unit/test_application_state.py | 27 +++- .../serve/tests/unit/test_deployment_class.py | 2 + python/ray/serve/tests/unit/test_schema.py | 36 ++++++ .../workloads/torch_tune_serve_test.py | 5 +- 40 files changed, 1042 insertions(+), 193 deletions(-) create mode 100644 python/ray/serve/tests/test_deployment_graph_ingress.py create mode 100644 python/ray/serve/tests/test_deployment_node.py diff --git a/doc/source/serve/doc_code/image_classifier_example.py b/doc/source/serve/doc_code/image_classifier_example.py index 3e4cf1fe8bcf..3f2fec9ef4ce 100644 --- a/doc/source/serve/doc_code/image_classifier_example.py +++ b/doc/source/serve/doc_code/image_classifier_example.py @@ -35,7 +35,7 @@ async def __call__(self, req: starlette.requests.Request): return await self.classify(req["image_url"]) -app = ImageClassifier.bind(downloader.bind()) +app = ImageClassifier.options(route_prefix="/classify").bind(downloader.bind()) # __serve_example_end__ @@ -65,7 +65,7 @@ async def __call__(self, req: starlette.requests.Request): # __serve_example_modified_end__ -serve.run(app, name="app1", route_prefix="/classify") +serve.run(app, name="app1") # __request_begin__ bear_url = "https://cdn.britannica.com/41/156441-050-A4424AEC/Grizzly-bear-Jasper-National-Park-Canada-Alberta.jpg" # noqa resp = requests.post("http://localhost:8000/classify", json={"image_url": bear_url}) diff --git a/doc/source/serve/doc_code/translator_example.py b/doc/source/serve/doc_code/translator_example.py index 602a58ba9655..3159f613b848 100644 --- a/doc/source/serve/doc_code/translator_example.py +++ b/doc/source/serve/doc_code/translator_example.py @@ -21,11 +21,11 @@ async def __call__(self, req: starlette.requests.Request): return self.translate(req["text"]) -app = Translator.bind() +app = Translator.options(route_prefix="/translate").bind() # __serve_example_end__ -serve.run(app, name="app2", route_prefix="/translate") +serve.run(app, name="app2") # __request_begin__ text = "Hello, the weather is quite fine today!" diff --git a/java/serve/src/main/java/io/ray/serve/api/Serve.java b/java/serve/src/main/java/io/ray/serve/api/Serve.java index 759ff63dfade..38d38b2cfe85 100644 --- a/java/serve/src/main/java/io/ray/serve/api/Serve.java +++ b/java/serve/src/main/java/io/ray/serve/api/Serve.java @@ -297,7 +297,8 @@ public static Deployment getDeployment(String name) { name, deploymentRoute.getDeploymentInfo().getDeploymentConfig(), deploymentRoute.getDeploymentInfo().getReplicaConfig(), - deploymentRoute.getDeploymentInfo().getVersion()); + deploymentRoute.getDeploymentInfo().getVersion(), + deploymentRoute.getRoute()); } /** @@ -306,7 +307,7 @@ public static Deployment getDeployment(String name) { * @param target A Serve application returned by `Deployment.bind()`. * @return A handle that can be used to call the application. */ - public static DeploymentHandle run(Application target) { + public static Optional run(Application target) { return run(target, true, Constants.SERVE_DEFAULT_APP_NAME, null, null); } @@ -317,11 +318,13 @@ public static DeploymentHandle run(Application target) { * @param blocking * @param name Application name. If not provided, this will be the only application running on the * cluster (it will delete all others). - * @param routePrefix Route prefix for HTTP requests. Defaults to '/'. + * @param routePrefix Route prefix for HTTP requests. If not provided, it will use route_prefix of + * the ingress deployment. If specified neither as an argument nor in the ingress deployment, + * the route prefix will default to '/'. * @param config * @return A handle that can be used to call the application. */ - public static DeploymentHandle run( + public static Optional run( Application target, boolean blocking, String name, @@ -332,20 +335,19 @@ public static DeploymentHandle run( throw new RayServeException("Application name must a non-empty string."); } - if (StringUtils.isNotBlank(routePrefix)) { - Preconditions.checkArgument( - routePrefix.startsWith("/"), "The route_prefix must start with a forward slash ('/')"); - } else { - routePrefix = "/"; - } - ServeControllerClient client = serveStart(config); List deployments = Graph.build(target.getInternalDagNode(), name); - Deployment ingressDeployment = deployments.get(deployments.size() - 1); + Deployment ingress = Graph.getAndValidateIngressDeployment(deployments); for (Deployment deployment : deployments) { // Overwrite route prefix + if (StringUtils.isNotBlank(deployment.getRoutePrefix()) + && StringUtils.isNotBlank(routePrefix)) { + Preconditions.checkArgument( + routePrefix.startsWith("/"), "The route_prefix must start with a forward slash ('/')"); + deployment.setRoutePrefix(routePrefix); + } deployment .getDeploymentConfig() .setVersion( @@ -354,8 +356,12 @@ public static DeploymentHandle run( : RandomStringUtils.randomAlphabetic(6)); } - client.deployApplication(name, routePrefix, deployments, ingressDeployment.getName(), blocking); - return client.getDeploymentHandle(ingressDeployment.getName(), name, true); + client.deployApplication(name, deployments, blocking); + + return Optional.ofNullable(ingress) + .map( + ingressDeployment -> + client.getDeploymentHandle(ingressDeployment.getName(), name, true)); } private static void init() { diff --git a/java/serve/src/main/java/io/ray/serve/api/ServeControllerClient.java b/java/serve/src/main/java/io/ray/serve/api/ServeControllerClient.java index 654eecdbaea8..fdbb37c5906f 100644 --- a/java/serve/src/main/java/io/ray/serve/api/ServeControllerClient.java +++ b/java/serve/src/main/java/io/ray/serve/api/ServeControllerClient.java @@ -160,19 +160,11 @@ public BaseActorHandle getController() { /** * Deployment an application with deployment list. * - * @param name application name. - * @param routePrefix route prefix for the application. - * @param deployments deployment list. - * @param ingressDeploymentName name of the ingress deployment (the one that is exposed over - * HTTP). + * @param name application name + * @param deployments deployment list * @param blocking Wait for the applications to be deployed or not. */ - public void deployApplication( - String name, - String routePrefix, - List deployments, - String ingressDeploymentName, - boolean blocking) { + public void deployApplication(String name, List deployments, boolean blocking) { Object[] deploymentArgsArray = new Object[deployments.size()]; @@ -186,8 +178,8 @@ public void deployApplication( ByteString.copyFrom(deployment.getDeploymentConfig().toProtoBytes())) .setIngress(deployment.isIngress()) .setDeployerJobId(Ray.getRuntimeContext().getCurrentJobId().toString()); - if (deployment.getName() == ingressDeploymentName) { - deploymentArgs.setRoutePrefix(routePrefix); + if (deployment.getRoutePrefix() != null) { + deploymentArgs.setRoutePrefix(deployment.getRoutePrefix()); } deploymentArgsArray[i] = deploymentArgs.build().toByteArray(); } @@ -203,6 +195,7 @@ public void deployApplication( logDeploymentReady( deployment.getName(), deployment.getVersion(), + deployment.getUrl(), "component=serve deployment=" + deployment.getName()); } } @@ -245,11 +238,13 @@ private void waitForApplicationRunning(String name, Long timeoutS) { "Application {} did not become RUNNING after {}s.", name, timeoutS)); } - private void logDeploymentReady(String name, String version, String tag) { + private void logDeploymentReady(String name, String version, String url, String tag) { + String urlPart = url != null ? MessageFormatter.format(" at `{}`", url) : ""; LOGGER.info( - "Deployment '{}{}' is ready. {}", + "Deployment '{}{}' is ready {}. {}", name, StringUtils.isNotBlank(version) ? "':'" + version : "", + urlPart, tag); } diff --git a/java/serve/src/main/java/io/ray/serve/dag/Graph.java b/java/serve/src/main/java/io/ray/serve/dag/Graph.java index 7fe048ec4981..8a999cfa8d94 100644 --- a/java/serve/src/main/java/io/ray/serve/dag/Graph.java +++ b/java/serve/src/main/java/io/ray/serve/dag/Graph.java @@ -1,9 +1,13 @@ package io.ray.serve.dag; +import com.google.common.base.Preconditions; import io.ray.serve.deployment.Deployment; import io.ray.serve.handle.DeploymentHandle; +import io.ray.serve.util.CollectionUtil; import io.ray.serve.util.CommonUtil; import io.ray.serve.util.DAGUtil; +import io.ray.serve.util.MessageFormatter; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -15,7 +19,37 @@ public class Graph { public static List build(DAGNode rayDagRootNode, String name) { DAGNodeBase serveRootDag = rayDagRootNode.applyRecursive(node -> transformRayDagToServeDag(node, name)); - return extractDeployments(serveRootDag); + List deployments = extractDeployments(serveRootDag); + List deploymentsWithHttp = processIngressDeploymentInServeDag(deployments); + return deploymentsWithHttp; + } + + private static List processIngressDeploymentInServeDag(List deployments) { + if (CollectionUtil.isEmpty(deployments)) { + return deployments; + } + + Deployment ingressDeployment = deployments.get(deployments.size() - 1); + if (StringUtils.isBlank(ingressDeployment.getRoutePrefix()) + || StringUtils.equals( + ingressDeployment.getRoutePrefix(), "/" + ingressDeployment.getName())) { + ingressDeployment.setRoutePrefix("/"); + } + + for (int i = 0; i < deployments.size() - 1; i++) { + Deployment deployment = deployments.get(i); + Preconditions.checkArgument( + StringUtils.isBlank(deployment.getRoutePrefix()) + || StringUtils.equals(deployment.getRoutePrefix(), "/" + deployment.getName()), + MessageFormatter.format( + "Route prefix is only configurable on the ingress deployment. " + + "Please do not set non-default route prefix: " + + "{} on non-ingress deployment of the " + + "serve DAG. ", + deployment.getRoutePrefix())); + deployment.setRoutePrefix(null); + } + return deployments; } public static DAGNodeBase transformRayDagToServeDag(DAGNodeBase dagNode, String appName) { @@ -30,6 +64,12 @@ public static DAGNodeBase transformRayDagToServeDag(DAGNodeBase dagNode, String deploymentName = deploymentShell.getName(); } + String routePrefix = + StringUtils.isBlank(deploymentShell.getRoutePrefix()) + || !StringUtils.equals(deploymentShell.getRoutePrefix(), "/" + deploymentName) + ? deploymentShell.getRoutePrefix() + : "/" + deploymentName; + Object[] replacedDeploymentInitArgs = new Object[clsNode.getBoundArgs().length]; for (int i = 0; i < clsNode.getBoundArgs().length; i++) { replacedDeploymentInitArgs[i] = @@ -44,6 +84,7 @@ public static DAGNodeBase transformRayDagToServeDag(DAGNodeBase dagNode, String .setDeploymentDef(clsNode.getClassName()) .setName(deploymentName) .setInitArgs(replacedDeploymentInitArgs) + .setRoutePrefix(routePrefix) .create(false); return new DeploymentNode( @@ -70,6 +111,26 @@ public static List extractDeployments(DAGNodeBase rootNode) { return deployments.values().stream().collect(Collectors.toList()); } + public static Deployment getAndValidateIngressDeployment(List deployments) { + + List ingressDeployments = new ArrayList<>(); + for (Deployment deployment : deployments) { + if (StringUtils.isNotBlank(deployment.getRoutePrefix())) { + ingressDeployments.add(deployment); + } + } + + Preconditions.checkArgument( + ingressDeployments.size() == 1, + MessageFormatter.format( + "Only one deployment in an Serve Application or DAG can have non-None route prefix. {} ingress deployments found: {}", + ingressDeployments.size(), + ingressDeployments)); + + ingressDeployments.get(0).setIngress(true); + return ingressDeployments.get(0); + } + public static DeploymentHandle replaceWithHandle(DAGNode node) { if (node instanceof DeploymentNode) { DeploymentNode deploymentNode = (DeploymentNode) node; diff --git a/java/serve/src/main/java/io/ray/serve/deployment/Deployment.java b/java/serve/src/main/java/io/ray/serve/deployment/Deployment.java index 64fc4be0a6ad..ded2c0f3cad7 100644 --- a/java/serve/src/main/java/io/ray/serve/deployment/Deployment.java +++ b/java/serve/src/main/java/io/ray/serve/deployment/Deployment.java @@ -10,6 +10,7 @@ import io.ray.serve.handle.DeploymentHandle; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,12 +32,30 @@ public class Deployment { private final String version; + private String routePrefix; + + private final String url; + private boolean ingress; // TODO support placement group. public Deployment( - String name, DeploymentConfig deploymentConfig, ReplicaConfig replicaConfig, String version) { + String name, + DeploymentConfig deploymentConfig, + ReplicaConfig replicaConfig, + String version, + String routePrefix) { + + if (StringUtils.isNotBlank(routePrefix)) { + Preconditions.checkArgument(routePrefix.startsWith("/"), "route_prefix must start with '/'."); + Preconditions.checkArgument( + routePrefix.equals("/") || !routePrefix.endsWith("/"), + "route_prefix must not end with '/' unless it's the root."); + Preconditions.checkArgument( + !routePrefix.contains("{") && !routePrefix.contains("}"), + "route_prefix may not contain wildcards."); + } Preconditions.checkArgument( version != null || deploymentConfig.getAutoscalingConfig() == null, @@ -46,6 +65,8 @@ public Deployment( this.version = version; this.deploymentConfig = deploymentConfig; this.replicaConfig = replicaConfig; + this.routePrefix = routePrefix; + this.url = routePrefix != null ? Serve.getGlobalClient().getRootUrl() + routePrefix : null; } /** @@ -76,6 +97,7 @@ public DeploymentCreator options() { .setVersion(this.version) .setNumReplicas(this.deploymentConfig.getNumReplicas()) .setInitArgs(this.replicaConfig.getInitArgs()) + .setRoutePrefix(this.routePrefix) .setRayActorOptions(this.replicaConfig.getRayActorOptions()) .setUserConfig(this.deploymentConfig.getUserConfig()) .setMaxOngoingRequests(this.deploymentConfig.getMaxOngoingRequests()) @@ -125,6 +147,18 @@ public String getVersion() { return version; } + public String getRoutePrefix() { + return routePrefix; + } + + public String getUrl() { + return url; + } + + public void setRoutePrefix(String routePrefix) { + this.routePrefix = routePrefix; + } + public boolean isIngress() { return ingress; } diff --git a/java/serve/src/main/java/io/ray/serve/deployment/DeploymentCreator.java b/java/serve/src/main/java/io/ray/serve/deployment/DeploymentCreator.java index 1515d8eb0450..99009b98c427 100644 --- a/java/serve/src/main/java/io/ray/serve/deployment/DeploymentCreator.java +++ b/java/serve/src/main/java/io/ray/serve/deployment/DeploymentCreator.java @@ -43,6 +43,16 @@ public class DeploymentCreator { */ private Object[] initArgs; + /** + * Requests to paths under this HTTP path prefix will be routed to this deployment. Defaults to + * '/{name}'. When set to 'None', no HTTP endpoint will be created. Routing is done based on + * longest-prefix match, so if you have deployment A with a prefix of '/a' and deployment B with a + * prefix of '/a/b', requests to '/a', '/a/', and '/a/c' go to A and requests to '/a/b', '/a/b/', + * and '/a/b/c' go to B. Routes must not end with a '/' unless they're the root (just '/'), which + * acts as a catch-all. + */ + @Deprecated private String routePrefix; + /** Options to be passed to the Ray actor constructor such as resource requirements. */ private Map rayActorOptions; @@ -87,6 +97,10 @@ public Deployment create(boolean check) { LOGGER.warn( "DeprecationWarning: `version` in `@serve.deployment` has been deprecated. Explicitly specifying version will raise an error in the future!"); } + if (routePrefix != null) { + LOGGER.warn( + "DeprecationWarning: `route_prefix` in `@serve.deployment` has been deprecated. To specify a route prefix for an application, pass it into `serve.run` instead."); + } DeploymentConfig deploymentConfig = new DeploymentConfig() @@ -106,7 +120,8 @@ public Deployment create(boolean check) { StringUtils.isNotBlank(name) ? name : CommonUtil.getDeploymentName(deploymentDef), deploymentConfig, replicaConfig, - version); + version, + routePrefix); } public Deployment create() { @@ -162,6 +177,15 @@ public DeploymentCreator setInitArgs(Object[] initArgs) { return this; } + public String getRoutePrefix() { + return routePrefix; + } + + public DeploymentCreator setRoutePrefix(String routePrefix) { + this.routePrefix = routePrefix; + return this; + } + public Map getRayActorOptions() { return rayActorOptions; } diff --git a/java/serve/src/test/java/io/ray/serve/deployment/CrossLanguageDeploymentTest.java b/java/serve/src/test/java/io/ray/serve/deployment/CrossLanguageDeploymentTest.java index d8ad835df017..21cbf94ad946 100644 --- a/java/serve/src/test/java/io/ray/serve/deployment/CrossLanguageDeploymentTest.java +++ b/java/serve/src/test/java/io/ray/serve/deployment/CrossLanguageDeploymentTest.java @@ -56,7 +56,7 @@ public void createPyClassTest() { .setNumReplicas(1) .bind("28"); - DeploymentHandle handle = Serve.run(deployment); + DeploymentHandle handle = Serve.run(deployment).get(); Assert.assertEquals(handle.method("increase").remote("6").result(), "34"); } @@ -70,7 +70,7 @@ public void createPyClassWithObjectRefTest() { .setNumReplicas(1) .bind("28"); - DeploymentHandle handle = Serve.run(deployment); + DeploymentHandle handle = Serve.run(deployment).get(); ObjectRef numRef = Ray.put(10); Assert.assertEquals(handle.method("increase").remote(numRef).result(), "38"); } @@ -84,7 +84,7 @@ public void createPyMethodTest() { .setDeploymentDef(PYTHON_MODULE + ".echo_server") .setNumReplicas(1) .bind(); - DeploymentHandle handle = Serve.run(deployment); + DeploymentHandle handle = Serve.run(deployment).get(); Assert.assertEquals(handle.method("__call__").remote("6").result(), "6"); } @@ -97,7 +97,7 @@ public void createPyMethodWithObjectRefTest() { .setDeploymentDef(PYTHON_MODULE + ".echo_server") .setNumReplicas(1) .bind(); - DeploymentHandle handle = Serve.run(deployment); + DeploymentHandle handle = Serve.run(deployment).get(); ObjectRef numRef = Ray.put("10"); Assert.assertEquals(handle.method("__call__").remote(numRef).result(), "10"); } @@ -112,7 +112,7 @@ public void userConfigTest() throws InterruptedException { .setNumReplicas(1) .setUserConfig("1") .bind("28"); - DeploymentHandle handle = Serve.run(deployment); + DeploymentHandle handle = Serve.run(deployment).get(); Assert.assertEquals(handle.method("increase").remote("6").result(), "7"); // deployment.options().setUserConfig("3").create().deploy(true); // TimeUnit.SECONDS.sleep(20L); diff --git a/java/serve/src/test/java/io/ray/serve/deployment/DeploymentGraphTest.java b/java/serve/src/test/java/io/ray/serve/deployment/DeploymentGraphTest.java index 159a4235ca2f..722bf4b7d687 100644 --- a/java/serve/src/test/java/io/ray/serve/deployment/DeploymentGraphTest.java +++ b/java/serve/src/test/java/io/ray/serve/deployment/DeploymentGraphTest.java @@ -68,7 +68,7 @@ public void bindTest() { Application deployment = Serve.deployment().setDeploymentDef(Counter.class.getName()).setNumReplicas(1).bind("2"); - DeploymentHandle handle = Serve.run(deployment); + DeploymentHandle handle = Serve.run(deployment).get(); Assert.assertEquals(handle.remote("2").result(), "4"); } @@ -108,7 +108,7 @@ public void testPassHandle() { Application driver = Serve.deployment().setDeploymentDef(Combiner.class.getName()).bind(modelA, modelB); - DeploymentHandle handle = Serve.run(driver); + DeploymentHandle handle = Serve.run(driver).get(); Assert.assertEquals(handle.remote("test").result(), "A:test,B:test"); } diff --git a/java/serve/src/test/java/io/ray/serve/deployment/DeploymentTest.java b/java/serve/src/test/java/io/ray/serve/deployment/DeploymentTest.java index edd615169b13..649a318ec996 100644 --- a/java/serve/src/test/java/io/ray/serve/deployment/DeploymentTest.java +++ b/java/serve/src/test/java/io/ray/serve/deployment/DeploymentTest.java @@ -31,7 +31,7 @@ public void deployTest() { .setNumReplicas(1) .setUserConfig("_test") .bind("echo_"); - DeploymentHandle handle = Serve.run(deployment); + DeploymentHandle handle = Serve.run(deployment).get(); Assert.assertEquals(handle.method("call").remote("6").result(), "echo_6_test"); Assert.assertTrue((boolean) handle.method("checkHealth").remote().result()); } @@ -100,7 +100,7 @@ public void autoScaleTest() { .setVersion("v1") .bind("echo_"); - DeploymentHandle handle = Serve.run(deployment); + DeploymentHandle handle = Serve.run(deployment).get(); Assert.assertEquals(handle.method("call").remote("6").result(), "echo_6_test"); } diff --git a/java/serve/src/test/java/io/ray/serve/docdemo/ManagePythonDeployment.java b/java/serve/src/test/java/io/ray/serve/docdemo/ManagePythonDeployment.java index 4f1e8aeec7ad..ef822b73095c 100644 --- a/java/serve/src/test/java/io/ray/serve/docdemo/ManagePythonDeployment.java +++ b/java/serve/src/test/java/io/ray/serve/docdemo/ManagePythonDeployment.java @@ -23,7 +23,7 @@ public static void main(String[] args) { .setDeploymentDef("counter.Counter") .setNumReplicas(1) .bind("1"); - DeploymentHandle handle = Serve.run(deployment); + DeploymentHandle handle = Serve.run(deployment).get(); System.out.println(handle.method("increase").remote("2").result()); } diff --git a/java/serve/src/test/java/io/ray/serve/repdemo/DeploymentDemo.java b/java/serve/src/test/java/io/ray/serve/repdemo/DeploymentDemo.java index 6eaed2afc1f6..116825501238 100644 --- a/java/serve/src/test/java/io/ray/serve/repdemo/DeploymentDemo.java +++ b/java/serve/src/test/java/io/ray/serve/repdemo/DeploymentDemo.java @@ -18,7 +18,7 @@ public String call() { public static void main(String[] args) { Application deployment = Serve.deployment().setDeploymentDef(DeploymentDemo.class.getName()).bind(); - DeploymentHandle handle = Serve.run(deployment); + DeploymentHandle handle = Serve.run(deployment).get(); System.out.println(handle.remote().result()); } } diff --git a/python/ray/serve/_private/api.py b/python/ray/serve/_private/api.py index 97641dc47f32..0e4d25188fb6 100644 --- a/python/ray/serve/_private/api.py +++ b/python/ray/serve/_private/api.py @@ -20,13 +20,51 @@ from ray.serve._private.controller import ServeController from ray.serve.config import HTTPOptions, gRPCOptions from ray.serve.context import _get_global_client, _set_global_client -from ray.serve.deployment import Application +from ray.serve.deployment import Application, Deployment from ray.serve.exceptions import RayServeException from ray.serve.schema import LoggingConfig logger = logging.getLogger(SERVE_LOGGER_NAME) +def get_deployment(name: str, app_name: str = ""): + """Dynamically fetch a handle to a Deployment object. + + Args: + name: name of the deployment. This must have already been + deployed. + + Returns: + Deployment + """ + try: + ( + deployment_info, + route_prefix, + ) = _get_global_client().get_deployment_info(name, app_name) + except KeyError: + if len(app_name) == 0: + msg = ( + f"Deployment {name} was not found. Did you call Deployment.deploy()? " + "Note that `serve.get_deployment()` can only be used to fetch a " + "deployment that was deployed using the 1.x API `Deployment.deploy()`. " + "If you want to fetch a handle to an application deployed through " + "`serve.run` or through a Serve config, please use " + "`serve.get_app_handle()` instead." + ) + else: + msg = f"Deployment {name} in application {app_name} was not found." + raise KeyError(msg) + return Deployment( + name, + deployment_info.deployment_config, + deployment_info.replica_config, + version=deployment_info.version, + route_prefix=route_prefix, + _internal=True, + ) + + def _check_http_options( client: ServeControllerClient, http_options: Union[dict, HTTPOptions] ) -> None: diff --git a/python/ray/serve/_private/application_state.py b/python/ray/serve/_private/application_state.py index 55d273fd7cb4..50432b610733 100644 --- a/python/ray/serve/_private/application_state.py +++ b/python/ray/serve/_private/application_state.py @@ -37,7 +37,6 @@ DEFAULT, check_obj_ref_ready_nowait, override_runtime_envs_except_env_vars, - validate_route_prefix, ) from ray.serve.config import AutoscalingConfig from ray.serve.exceptions import RayServeException @@ -932,10 +931,7 @@ def deploy_app(self, name: str, deployment_args: List[Dict]) -> None: } for deploy_param in deployment_args: - deploy_app_prefix = deploy_param.get("route_prefix", None) - if deploy_app_prefix is None: - continue - + deploy_app_prefix: str = deploy_param["route_prefix"] app_name = live_route_prefixes.get(deploy_app_prefix) if app_name is not None: raise RayServeException( @@ -1143,6 +1139,9 @@ def build_serve_application( try: from ray.serve._private.api import call_app_builder_with_args_if_necessary from ray.serve._private.deployment_graph_build import build as pipeline_build + from ray.serve._private.deployment_graph_build import ( + get_and_validate_ingress_deployment, + ) # Import and build the application. args_info_str = f" with arguments {args}" if args else "" @@ -1150,7 +1149,7 @@ def build_serve_application( app = call_app_builder_with_args_if_necessary(import_attr(import_path), args) deployments = pipeline_build(app._get_internal_dag_node(), name) - ingress = deployments[-1] + ingress = get_and_validate_ingress_deployment(deployments) deploy_args_list = [] for deployment in deployments: @@ -1162,7 +1161,7 @@ def build_serve_application( ingress=is_ingress, deployment_config=deployment._deployment_config, version=code_version, - route_prefix="/" if is_ingress else None, + route_prefix=deployment.route_prefix, docs_path=deployment._docs_path, ) ) @@ -1214,11 +1213,6 @@ def override_deployment_info( options["max_ongoing_requests"] = options.get("max_ongoing_requests") deployment_name = options["name"] - if deployment_name not in deployment_infos: - raise ValueError( - f"Got config override for nonexistent deployment '{deployment_name}'" - ) - info = deployment_infos[deployment_name] original_options = info.deployment_config.dict() original_options["user_configured_option_names"].update(set(options)) @@ -1245,6 +1239,11 @@ def override_deployment_info( # What to pass to info.update override_options = dict() + # Override route prefix if specified in deployment config + deployment_route_prefix = options.pop("route_prefix", DEFAULT.VALUE) + if deployment_route_prefix is not DEFAULT.VALUE: + override_options["route_prefix"] = deployment_route_prefix + # Merge app-level and deployment-level runtime_envs. replica_config = info.replica_config app_runtime_env = override_config.runtime_env @@ -1307,7 +1306,6 @@ def override_deployment_info( # Overwrite ingress route prefix app_route_prefix = config_dict.get("route_prefix", DEFAULT.VALUE) - validate_route_prefix(app_route_prefix) for deployment in list(deployment_infos.values()): if ( app_route_prefix is not DEFAULT.VALUE diff --git a/python/ray/serve/_private/client.py b/python/ray/serve/_private/client.py index c5d6b90da6bb..390fe4580055 100644 --- a/python/ray/serve/_private/client.py +++ b/python/ray/serve/_private/client.py @@ -2,7 +2,7 @@ import random import time from functools import wraps -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union import ray from ray.actor import ActorHandle @@ -251,12 +251,8 @@ def deploy_application( deployments: List[Dict], _blocking: bool = True, ): - ingress_route_prefix = None deployment_args_list = [] for deployment in deployments: - if deployment["ingress"]: - ingress_route_prefix = deployment["route_prefix"] - deployment_args = get_deploy_args( deployment["name"], replica_config=deployment["replica_config"], @@ -287,11 +283,13 @@ def deploy_application( ray.get(self._controller.deploy_application.remote(name, deployment_args_list)) if _blocking: self._wait_for_application_running(name) - if ingress_route_prefix is not None: - url_part = " at " + self._root_url + ingress_route_prefix - else: - url_part = "" - logger.info(f"Application '{name}' is ready{url_part}.") + for deployment in deployments: + deployment_name = deployment["name"] + tag = f"component=serve deployment={deployment_name}" + url = deployment["url"] + version = deployment["version"] + + self.log_deployment_ready(deployment_name, version, url, tag) @_ensure_connected def deploy_apps( @@ -365,6 +363,15 @@ def delete_all_apps(self, blocking: bool = True): all_apps.append(status.name) self.delete_apps(all_apps, blocking) + @_ensure_connected + def delete_deployments(self, names: Iterable[str], blocking: bool = True) -> None: + """Delete 1.x deployments.""" + + ray.get(self._controller.delete_deployments.remote(names)) + if blocking: + for name in names: + self._wait_for_deployment_deleted(name, "") + @_ensure_connected def get_deployment_info( self, name: str, app_name: str @@ -461,6 +468,36 @@ def get_handle( return handle + @_ensure_connected + def log_deployment_update_status( + self, name: str, version: str, updating: bool + ) -> str: + tag = f"component=serve deployment={name}" + + if updating: + msg = f"Updating deployment '{name}'" + if version is not None: + msg += f" to version '{version}'" + logger.info(f"{msg}. {tag}") + else: + logger.info( + f"Deployment '{name}' is already at version " + f"'{version}', not updating. {tag}" + ) + + return tag + + @_ensure_connected + def log_deployment_ready(self, name: str, version: str, url: str, tag: str) -> None: + if url is not None: + url_part = f" at `{url}`" + else: + url_part = "" + logger.info( + f"Deployment '{name}{':'+version if version else ''}' is ready" + f"{url_part}. {tag}" + ) + @_ensure_connected def record_multiplexed_replica_info(self, info: MultiplexedReplicaInfo): """Record multiplexed replica information for replica. diff --git a/python/ray/serve/_private/controller.py b/python/ray/serve/_private/controller.py index 8eff4c80315a..5a1606d6f311 100644 --- a/python/ray/serve/_private/controller.py +++ b/python/ray/serve/_private/controller.py @@ -48,6 +48,7 @@ from ray.serve._private.storage.kv_store import RayInternalKVStore from ray.serve._private.usage import ServeUsageTag from ray.serve._private.utils import ( + DEFAULT, call_function_from_import_path, get_all_live_placement_group_names, get_head_node_id, @@ -333,8 +334,8 @@ def get_all_endpoints_java(self) -> bytes: # NOTE(zcin): Java only supports 1.x deployments, so only return # a dictionary of deployment name -> endpoint info data = { - endpoint_tag.name: EndpointInfoProto(route=endpoint_dict["route"]) - for endpoint_tag, endpoint_dict in endpoints.items() + endpoint_tag.name: EndpointInfoProto(route=endppint_dict["route"]) + for endpoint_tag, endppint_dict in endpoints.items() } return EndpointSet(endpoints=data).SerializeToString() @@ -738,12 +739,12 @@ def deploy_application(self, name: str, deployment_args_list: List[bytes]) -> No "deployment_config_proto_bytes": deployment_args.deployment_config, "replica_config_proto_bytes": deployment_args.replica_config, "deployer_job_id": deployment_args.deployer_job_id, - "ingress": deployment_args.ingress, "route_prefix": ( deployment_args.route_prefix if deployment_args.HasField("route_prefix") else None ), + "ingress": deployment_args.ingress, "docs_path": ( deployment_args.docs_path if deployment_args.HasField("docs_path") @@ -786,6 +787,14 @@ def apply_config( self._target_capacity = config.target_capacity for app_config in config.applications: + for deployments in app_config.deployments: + if deployments.route_prefix != DEFAULT.VALUE: + logger.warning( + "Specifying route prefix for a deployment is deprecated. " + "Please specify route prefix at an application level in the " + "Serve config instead." + ) + # If the application logging config is not set, use the global logging # config. if app_config.logging_config is None and config.logging_config: @@ -865,7 +874,9 @@ def get_serve_instance_details(self) -> Dict: error messages, etc. Returns: - Dict that follows the format of the schema ServeInstanceDetails. + Dict that follows the format of the schema ServeInstanceDetails. Currently, + there is a value set for every field at all schema levels, except for the + route_prefix in the deployment_config for each deployment. """ http_config = self.get_http_config() @@ -902,7 +913,10 @@ def get_serve_instance_details(self) -> Dict: ) # NOTE(zcin): We use exclude_unset here because we explicitly and intentionally - # fill in all info that should be shown to users. + # fill in all info that should be shown to users. Currently, every field is set + # except for the route_prefix in the deployment_config of each deployment, since + # route_prefix is set instead in each application. + # Eventually we want to remove route_prefix from DeploymentSchema. http_options = HTTPOptionsSchema.parse_obj(http_config.dict(exclude_unset=True)) grpc_options = gRPCOptionsSchema.parse_obj(grpc_config.dict(exclude_unset=True)) return ServeInstanceDetails( diff --git a/python/ray/serve/_private/deploy_utils.py b/python/ray/serve/_private/deploy_utils.py index 405ab8f18909..24bfacde6450 100644 --- a/python/ray/serve/_private/deploy_utils.py +++ b/python/ray/serve/_private/deploy_utils.py @@ -67,10 +67,10 @@ def deploy_args_to_deployment_info( deployment_config_proto_bytes: bytes, replica_config_proto_bytes: bytes, deployer_job_id: Union[str, bytes], + route_prefix: Optional[str], docs_path: Optional[str], app_name: Optional[str] = None, ingress: bool = False, - route_prefix: Optional[str] = None, **kwargs, ) -> DeploymentInfo: """Takes deployment args passed to the controller after building an application and diff --git a/python/ray/serve/_private/deployment_function_node.py b/python/ray/serve/_private/deployment_function_node.py index 031faf910e46..c82f118c3bf9 100644 --- a/python/ray/serve/_private/deployment_function_node.py +++ b/python/ray/serve/_private/deployment_function_node.py @@ -36,9 +36,20 @@ def __init__( ] deployment_shell = schema_to_deployment(deployment_schema) + # Set the route prefix, prefer the one user supplied, + # otherwise set it to /deployment_name + if ( + deployment_shell.route_prefix is None + or deployment_shell.route_prefix != f"/{deployment_shell.name}" + ): + route_prefix = deployment_shell.route_prefix + else: + route_prefix = f"/{deployment_name}" + self._deployment = deployment_shell.options( func_or_class=func_body, name=self._deployment_name, + route_prefix=route_prefix, _init_args=(), _init_kwargs={}, _internal=True, diff --git a/python/ray/serve/_private/deployment_graph_build.py b/python/ray/serve/_private/deployment_graph_build.py index f5ef841c7e58..c3c05af635ad 100644 --- a/python/ray/serve/_private/deployment_graph_build.py +++ b/python/ray/serve/_private/deployment_graph_build.py @@ -92,8 +92,34 @@ def build( "you're using the DAG API, the function should be bound to a DAGDriver." ) - # The last deployment in the list is the ingress. - return deployments + # Validate and only expose HTTP for the endpoint. + deployments_with_http = process_ingress_deployment_in_serve_dag(deployments) + return deployments_with_http + + +def get_and_validate_ingress_deployment( + deployments: List[Deployment], +) -> Deployment: + """Validation for http route prefixes for a list of deployments in pipeline. + + Ensures: + 1) One and only one ingress deployment with given route prefix. + 2) All other not ingress deployments should have prefix of None. + """ + + ingress_deployments = [] + for deployment in deployments: + if deployment.route_prefix is not None: + ingress_deployments.append(deployment) + + if len(ingress_deployments) != 1: + raise ValueError( + "Only one deployment in an Serve Application or DAG can have " + f"non-None route prefix. {len(ingress_deployments)} ingress " + f"deployments found: {ingress_deployments}" + ) + + return ingress_deployments[0] def transform_ray_dag_to_serve_dag( @@ -142,9 +168,20 @@ def transform_ray_dag_to_serve_dag( ): deployment_name = deployment_shell.name + # Set the route prefix, prefer the one user supplied, + # otherwise set it to /deployment_name + if ( + deployment_shell.route_prefix is None + or deployment_shell.route_prefix != f"/{deployment_shell.name}" + ): + route_prefix = deployment_shell.route_prefix + else: + route_prefix = f"/{deployment_name}" + deployment = deployment_shell.options( func_or_class=dag_node._body, name=deployment_name, + route_prefix=route_prefix, _init_args=replaced_deployment_init_args, _init_kwargs=replaced_deployment_init_kwargs, _internal=True, @@ -220,3 +257,44 @@ def extractor(dag_node): serve_dag_root.apply_recursive(extractor) return list(deployments.values()) + + +def process_ingress_deployment_in_serve_dag( + deployments: List[Deployment], +) -> List[Deployment]: + """Mark the last fetched deployment in a serve dag as exposed with default + prefix. + """ + if len(deployments) == 0: + return deployments + + # Last element of the list is the root deployment if it's applicable type + # that wraps an deployment, given Ray DAG traversal is done bottom-up. + ingress_deployment = deployments[-1] + if ingress_deployment.route_prefix in [None, f"/{ingress_deployment.name}"]: + # Override default prefix to "/" on the ingress deployment, if user + # didn't provide anything in particular. + new_ingress_deployment = ingress_deployment.options( + route_prefix="/", + _internal=True, + ) + deployments[-1] = new_ingress_deployment + + # Erase all non ingress deployment route prefix + for i, deployment in enumerate(deployments[:-1]): + if ( + deployment.route_prefix is not None + and deployment.route_prefix != f"/{deployment.name}" + ): + raise ValueError( + "Route prefix is only configurable on the ingress deployment. " + "Please do not set non-default route prefix: " + f"{deployment.route_prefix} on non-ingress deployment of the " + "serve DAG. " + ) + else: + # Erase all default prefix to None for non-ingress deployments to + # disable HTTP + deployments[i] = deployment.options(route_prefix=None, _internal=True) + + return deployments diff --git a/python/ray/serve/_private/utils.py b/python/ray/serve/_private/utils.py index 78851fc419fe..8bf7e2f9d859 100644 --- a/python/ray/serve/_private/utils.py +++ b/python/ray/serve/_private/utils.py @@ -578,25 +578,3 @@ def get_component_file_name( suffix=suffix, ) return file_name - - -def validate_route_prefix(route_prefix: Union[DEFAULT, None, str]): - if route_prefix is DEFAULT.VALUE or route_prefix is None: - return - - if not route_prefix.startswith("/"): - raise ValueError( - f"Invalid route_prefix '{route_prefix}', " - "must start with a forward slash ('/')." - ) - - if route_prefix != "/" and route_prefix.endswith("/"): - raise ValueError( - f"Invalid route_prefix '{route_prefix}', " - "may not end with a trailing '/'." - ) - - if "{" in route_prefix or "}" in route_prefix: - raise ValueError( - f"Invalid route_prefix '{route_prefix}', " "may not contain wildcards." - ) diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 512c328f134d..d49f74a98fd7 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -18,6 +18,9 @@ ) from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME, SERVE_LOGGER_NAME from ray.serve._private.deployment_graph_build import build as pipeline_build +from ray.serve._private.deployment_graph_build import ( + get_and_validate_ingress_deployment, +) from ray.serve._private.http_util import ( ASGIAppReplicaWrapper, make_fastapi_class_based_view, @@ -29,7 +32,6 @@ ensure_serialization_context, extract_self_if_method_call, get_random_string, - validate_route_prefix, ) from ray.serve.config import ( AutoscalingConfig, @@ -280,6 +282,8 @@ class MyDeployment: this deployment. Defaults to 1. autoscaling_config: Parameters to configure autoscaling behavior. If this is set, `num_replicas` cannot be set. + route_prefix: [DEPRECATED] Route prefix should be set per-application + through `serve.run()` or the config file. ray_actor_options: Options to pass to the Ray Actor decorator, such as resource requirements. Valid options are: `accelerator_type`, `memory`, `num_cpus`, `num_gpus`, `resources`, and `runtime_env`. @@ -321,11 +325,6 @@ class MyDeployment: Returns: `Deployment` """ - if route_prefix is not DEFAULT.VALUE: - raise ValueError( - "`route_prefix` can no longer be specified at the deployment level. " - "Pass it to `serve.run` or in the application config instead." - ) if max_ongoing_requests is None: raise ValueError("`max_ongoing_requests` must be non-null, got None.") @@ -368,6 +367,13 @@ class MyDeployment: "Explicitly specifying version will raise an error in the future!" ) + if route_prefix is not DEFAULT.VALUE: + logger.warning( + "DeprecationWarning: `route_prefix` in `@serve.deployment` has been " + "deprecated. To specify a route prefix for an application, pass it into " + "`serve.run` instead." + ) + if isinstance(logging_config, LoggingConfig): logging_config = logging_config.dict() @@ -415,6 +421,7 @@ def decorator(_func_or_class): deployment_config, replica_config, version=(version if version is not DEFAULT.VALUE else None), + route_prefix=route_prefix, _internal=True, ) @@ -428,7 +435,7 @@ def _run( target: Application, _blocking: bool = True, name: str = SERVE_DEFAULT_APP_NAME, - route_prefix: Optional[str] = "/", + route_prefix: str = DEFAULT.VALUE, logging_config: Optional[Union[Dict, LoggingConfig]] = None, ) -> DeploymentHandle: """Run an application and return a handle to its ingress deployment. @@ -439,8 +446,6 @@ def _run( if len(name) == 0: raise RayServeException("Application name must a non-empty string.") - validate_route_prefix(route_prefix) - client = _private_api.serve_start( http_options={"location": "EveryNode"}, ) @@ -450,7 +455,7 @@ def _run( if isinstance(target, Application): deployments = pipeline_build(target._get_internal_dag_node(), name) - ingress_deployment_name = deployments[-1].name + ingress = get_and_validate_ingress_deployment(deployments) else: msg = "`serve.run` expects an `Application` returned by `Deployment.bind()`." if isinstance(target, DAGNode): @@ -461,33 +466,44 @@ def _run( raise TypeError(msg) parameter_group = [] + for deployment in deployments: - is_ingress = deployment._name == ingress_deployment_name - if deployment.logging_config is None and logging_config: - deployment = deployment.options(logging_config=logging_config) + # Overwrite route prefix + if route_prefix is not DEFAULT.VALUE and deployment._route_prefix is not None: + if route_prefix is not None and not route_prefix.startswith("/"): + raise ValueError( + "The route_prefix must start with a forward slash ('/')" + ) + deployment._route_prefix = route_prefix + if deployment.logging_config is None and logging_config: + if isinstance(logging_config, dict): + logging_config = LoggingConfig(**logging_config) + deployment.set_logging_config(logging_config.dict()) deployment_parameters = { "name": deployment._name, "replica_config": deployment._replica_config, "deployment_config": deployment._deployment_config, "version": deployment._version or get_random_string(), - "route_prefix": route_prefix if is_ingress else None, + "route_prefix": deployment.route_prefix, + "url": deployment.url, "docs_path": deployment._docs_path, - "ingress": is_ingress, + "ingress": deployment._name == ingress._name, } parameter_group.append(deployment_parameters) - client.deploy_application( name, parameter_group, _blocking=_blocking, ) - # The deployment state is not guaranteed to be created after - # deploy_application returns; the application state manager will - # need another reconcile iteration to create it. - client._wait_for_deployment_created(ingress_deployment_name, name) - return client.get_handle(ingress_deployment_name, name, check_exists=False) + if ingress is not None: + # The deployment state is not guaranteed to be created after + # deploy_application returns; the application state manager will + # need another reconcile iteration to create it. + client._wait_for_deployment_created(ingress.name, name) + handle = client.get_handle(ingress.name, name, check_exists=False) + return handle @PublicAPI(stability="stable") @@ -495,7 +511,7 @@ def run( target: Application, blocking: bool = False, name: str = SERVE_DEFAULT_APP_NAME, - route_prefix: Optional[str] = "/", + route_prefix: Optional[str] = DEFAULT.VALUE, logging_config: Optional[Union[Dict, LoggingConfig]] = None, ) -> DeploymentHandle: """Run an application and return a handle to its ingress deployment. diff --git a/python/ray/serve/deployment.py b/python/ray/serve/deployment.py index e621cf88434d..54c34d878c31 100644 --- a/python/ray/serve/deployment.py +++ b/python/ray/serve/deployment.py @@ -15,6 +15,7 @@ from ray.serve._private.usage import ServeUsageTag from ray.serve._private.utils import DEFAULT, Default from ray.serve.config import AutoscalingConfig +from ray.serve.context import _get_global_client from ray.serve.schema import DeploymentSchema, LoggingConfig, RayActorOptionsSchema from ray.util.annotations import PublicAPI @@ -116,6 +117,7 @@ def __init__( deployment_config: DeploymentConfig, replica_config: ReplicaConfig, version: Optional[str] = None, + route_prefix: Union[str, None, DEFAULT] = DEFAULT.VALUE, _internal=False, ) -> None: if not _internal: @@ -127,6 +129,18 @@ def __init__( raise TypeError("name must be a string.") if not (version is None or isinstance(version, str)): raise TypeError("version must be a string.") + if route_prefix is not DEFAULT.VALUE and route_prefix is not None: + if not isinstance(route_prefix, str): + raise TypeError("route_prefix must be a string.") + if not route_prefix.startswith("/"): + raise ValueError("route_prefix must start with '/'.") + if route_prefix != "/" and route_prefix.endswith("/"): + raise ValueError( + "route_prefix must not end with '/' unless it's the root." + ) + if "{" in route_prefix or "}" in route_prefix: + raise ValueError("route_prefix may not contain wildcards.") + docs_path = None if ( inspect.isclass(replica_config.deployment_def) @@ -140,6 +154,7 @@ def __init__( self._version = version self._deployment_config = deployment_config self._replica_config = replica_config + self._route_prefix = route_prefix self._docs_path = docs_path @property @@ -177,11 +192,11 @@ def max_queued_requests(self) -> int: return self._deployment_config.max_queued_requests @property - def route_prefix(self): - raise ValueError( - "`route_prefix` can no longer be specified at the deployment level. " - "Pass it to `serve.run` or in the application config instead." - ) + def route_prefix(self) -> Optional[str]: + """HTTP route prefix that this deployment is exposed under.""" + if self._route_prefix is DEFAULT.VALUE: + return f"/{self._name}" + return self._route_prefix @property def ray_actor_options(self) -> Optional[Dict]: @@ -198,11 +213,11 @@ def init_kwargs(self) -> Tuple[Any]: @property def url(self) -> Optional[str]: - logger.warning( - "DeprecationWarning: `Deployment.url` is deprecated " - "and will be removed in the future." - ) - return None + if self._route_prefix is None: + # this deployment is not exposed over HTTP + return None + + return _get_global_client().root_url + self.route_prefix @property def logging_config(self) -> Dict: @@ -250,6 +265,45 @@ def bind(self, *args, **kwargs) -> Application: return Application._from_internal_dag_node(dag_node) + def _deploy(self, *init_args, _blocking=True, **init_kwargs): + """Deploy or update this deployment. + + Args: + init_args: args to pass to the class __init__ + method. Not valid if this deployment wraps a function. + init_kwargs: kwargs to pass to the class __init__ + method. Not valid if this deployment wraps a function. + """ + if len(init_args) == 0 and self._replica_config.init_args is not None: + init_args = self._replica_config.init_args + if len(init_kwargs) == 0 and self._replica_config.init_kwargs is not None: + init_kwargs = self._replica_config.init_kwargs + + replica_config = ReplicaConfig.create( + self._replica_config.deployment_def, + init_args=init_args, + init_kwargs=init_kwargs, + ray_actor_options=self._replica_config.ray_actor_options, + placement_group_bundles=self._replica_config.placement_group_bundles, + placement_group_strategy=self._replica_config.placement_group_strategy, + max_replicas_per_node=self._replica_config.max_replicas_per_node, + ) + + return _get_global_client().deploy( + self._name, + replica_config=replica_config, + deployment_config=self._deployment_config, + version=self._version, + route_prefix=self.route_prefix, + url=self.url, + _blocking=_blocking, + ) + + def _delete(self): + """Delete this deployment.""" + + return _get_global_client().delete_deployments([self._name]) + def options( self, func_or_class: Optional[Callable] = None, @@ -283,11 +337,6 @@ def options( Refer to the `@serve.deployment` decorator docs for available arguments. """ - if route_prefix is not DEFAULT.VALUE: - raise ValueError( - "`route_prefix` can no longer be specified at the deployment level. " - "Pass it to `serve.run` or in the application config instead." - ) # Modify max_ongoing_requests and autoscaling_config if # `num_replicas="auto"` @@ -341,6 +390,13 @@ def options( "future!" ) + if not _internal and route_prefix is not DEFAULT.VALUE: + logger.warning( + "DeprecationWarning: `route_prefix` in `@serve.deployment` has been " + "deprecated. To specify a route prefix for an application, pass it " + "into `serve.run` instead." + ) + elif num_replicas not in [DEFAULT.VALUE, None]: new_deployment_config.num_replicas = num_replicas @@ -368,6 +424,10 @@ def options( if _init_kwargs is DEFAULT.VALUE: _init_kwargs = self._replica_config.init_kwargs + if route_prefix is DEFAULT.VALUE: + # Default is to keep the previous value + route_prefix = self._route_prefix + if ray_actor_options is DEFAULT.VALUE: ray_actor_options = self._replica_config.ray_actor_options @@ -419,6 +479,7 @@ def options( new_deployment_config, new_replica_config, version=version, + route_prefix=route_prefix, _internal=True, ) @@ -430,25 +491,35 @@ def __eq__(self, other): self._deployment_config == other._deployment_config, self._replica_config.init_args == other._replica_config.init_args, self._replica_config.init_kwargs == other._replica_config.init_kwargs, + # compare route prefix with default value resolved + self.route_prefix == other.route_prefix, self._replica_config.ray_actor_options == self._replica_config.ray_actor_options, ] ) def __str__(self): - return f"Deployment(name={self._name})" + return ( + f"Deployment(name={self._name}," + f"version={self._version}," + f"route_prefix={self.route_prefix})" + ) def __repr__(self): return str(self) def deployment_to_schema( - d: Deployment, + d: Deployment, include_route_prefix: bool = True ) -> DeploymentSchema: """Converts a live deployment object to a corresponding structured schema. Args: d: Deployment object to convert + include_route_prefix: Whether to include the route_prefix in the returned + schema. This should be set to False if the schema will be included in a + higher-level object describing an application, and you want to place + route_prefix at the application level. """ if d.ray_actor_options is not None: @@ -476,6 +547,9 @@ def deployment_to_schema( "logging_config": d._deployment_config.logging_config, } + if include_route_prefix: + deployment_options["route_prefix"] = d.route_prefix + # Let non-user-configured options be set to defaults. If the schema # is converted back to a deployment, this lets Serve continue tracking # which options were set by the user. Name is a required field in the @@ -553,5 +627,6 @@ def schema_to_deployment(s: DeploymentSchema) -> Deployment: name=s.name, deployment_config=deployment_config, replica_config=replica_config, + route_prefix=s.route_prefix, _internal=True, ) diff --git a/python/ray/serve/schema.py b/python/ray/serve/schema.py index 5556b341af36..e04684cf3aa6 100644 --- a/python/ray/serve/schema.py +++ b/python/ray/serve/schema.py @@ -278,6 +278,13 @@ class DeploymentSchema(BaseModel, allow_population_by_field_name=True): "(experimental)." ), ) + # route_prefix of None means the deployment is not exposed over HTTP. + route_prefix: Union[str, None] = Field( + default=DEFAULT.VALUE, + description=( + "[DEPRECATED] Please use route_prefix under ServeApplicationSchema instead." + ), + ) max_ongoing_requests: int = Field( default=DEFAULT.VALUE, description=( @@ -437,6 +444,10 @@ def validate_max_queued_requests(cls, values): return values + deployment_schema_route_prefix_format = validator("route_prefix", allow_reuse=True)( + _route_prefix_format + ) + def _get_user_configured_option_names(self) -> Set[str]: """Get set of names for all user-configured options. @@ -449,7 +460,12 @@ def _get_user_configured_option_names(self) -> Set[str]: def _deployment_info_to_schema(name: str, info: DeploymentInfo) -> DeploymentSchema: - """Converts a DeploymentInfo object to DeploymentSchema.""" + """Converts a DeploymentInfo object to DeploymentSchema. + + Route_prefix will not be set in the returned DeploymentSchema, since starting in 2.x + route_prefix is an application-level concept. (This should only be used on the 2.x + codepath) + """ schema = DeploymentSchema( name=name, @@ -941,6 +957,21 @@ class DeploymentDetails(BaseModel, extra=Extra.forbid, frozen=True): description="Details about the live replicas of this deployment." ) + @validator("deployment_config") + def deployment_route_prefix_not_set(cls, v: DeploymentSchema): + # Route prefix should not be set at the deployment level. Deployment-level route + # prefix is outdated, there should be one route prefix per application + if ( + "route_prefix" in v.__fields_set__ + ): # in Pydantic v2, this becomes `in v.model_fields_set` + raise ValueError( + "Unexpectedly found a deployment-level route_prefix in the " + f'deployment_config for deployment "{v.name}". The route_prefix in ' + "deployment_config within DeploymentDetails should not be set; please " + "set it at the application level." + ) + return v + @PublicAPI(stability="alpha") class APIType(str, Enum): diff --git a/python/ray/serve/scripts.py b/python/ray/serve/scripts.py index 270dc4a032b4..dbad99756546 100644 --- a/python/ray/serve/scripts.py +++ b/python/ray/serve/scripts.py @@ -27,6 +27,10 @@ SERVE_NAMESPACE, ) from ray.serve._private.deployment_graph_build import build as pipeline_build +from ray.serve._private.deployment_graph_build import ( + get_and_validate_ingress_deployment, +) +from ray.serve._private.utils import DEFAULT from ray.serve.config import DeploymentMode, ProxyLocation, gRPCOptions from ray.serve.deployment import Application, deployment_to_schema from ray.serve.schema import ( @@ -434,7 +438,6 @@ def deploy( "--route-prefix", required=False, type=str, - default="/", help=( "Route prefix for the application. This should only be used " "when running an application specified by import path and " @@ -462,9 +465,11 @@ def run( address: str, blocking: bool, reload: bool, - route_prefix: str, + route_prefix: Optional[str], name: str, ): + if route_prefix is None: + route_prefix = DEFAULT.VALUE sys.path.insert(0, app_dir) args_dict = convert_args_to_dict(arguments) final_runtime_env = parse_runtime_env_args( @@ -795,12 +800,15 @@ def build_app_config(import_path: str, name: str = None): ) deployments = pipeline_build(app, name) + ingress = get_and_validate_ingress_deployment(deployments) schema = ServeApplicationSchema( name=name, - route_prefix="/" if len(import_paths) == 1 else f"/{name}", + route_prefix=ingress.route_prefix, import_path=import_path, runtime_env={}, - deployments=[deployment_to_schema(d) for d in deployments], + deployments=[ + deployment_to_schema(d, include_route_prefix=False) for d in deployments + ], ) return schema.dict(exclude_unset=True) diff --git a/python/ray/serve/tests/BUILD b/python/ray/serve/tests/BUILD index e666ead5ec62..8dc26704eb75 100644 --- a/python/ray/serve/tests/BUILD +++ b/python/ray/serve/tests/BUILD @@ -24,6 +24,8 @@ py_test_module_list( "test_constructor_failure.py", "test_controller.py", "test_deployment_graph_handle_serde.py", + "test_deployment_graph_ingress.py", + "test_deployment_node.py", "test_deployment_scheduler.py", "test_deployment_version.py", "test_expected_versions.py", diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 36b87f8955f3..6b151b7c2aa0 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -349,7 +349,7 @@ def f(): def g(): return "got g" - @serve.deployment + @serve.deployment(route_prefix="/my_prefix") def h(): return "got h" @@ -360,7 +360,7 @@ def __call__(self, *args): app = FastAPI() - @serve.deployment + @serve.deployment(route_prefix="/hello") @serve.ingress(app) class MyFastAPIDeployment: @app.get("/") @@ -379,12 +379,12 @@ def root(self): # Test function deployment with app name and route_prefix set in deployment # decorator - h_handle = serve.run(h.bind(), name="app_h", route_prefix="/my_prefix") + h_handle = serve.run(h.bind(), name="app_h") assert h_handle.remote().result() == "got h" assert requests.get("http://127.0.0.1:8000/my_prefix").text == "got h" # Test FastAPI - serve.run(MyFastAPIDeployment.bind(), name="FastAPI", route_prefix="/hello") + serve.run(MyFastAPIDeployment.bind(), name="FastAPI") assert requests.get("http://127.0.0.1:8000/hello").text == '"Hello, world!"' @@ -551,6 +551,55 @@ def __call__(self): assert requests.get("http://127.0.0.1:8000/").text == "got model" +@pytest.mark.parametrize( + "ingress_route,app_route", + [ + ("/hello", "/"), + ("/hello", "/override"), + ("/", "/override"), + (None, "/override"), + ("/hello", None), + (None, None), + ], +) +def test_application_route_prefix_override(serve_instance, ingress_route, app_route): + """ + Set route prefix in serve.run to a non-None value, check it overrides correctly. + """ + + @serve.deployment + def f(): + return "hello" + + node = f.options(route_prefix=ingress_route).bind() + serve.run(node, route_prefix=app_route) + if app_route is None: + routes = requests.get("http://localhost:8000/-/routes").json() + assert len(routes) == 0 + else: + assert requests.get(f"http://localhost:8000{app_route}").text == "hello" + + +@pytest.mark.parametrize("ingress_route", ["/hello", "/"]) +def test_application_route_prefix_override1(serve_instance, ingress_route): + """ + Don't set route prefix in serve.run, check it always uses the ingress deployment + route. + """ + + @serve.deployment + def f(): + return "hello" + + node = f.options(route_prefix=ingress_route).bind() + serve.run(node) + if ingress_route is None: + routes = requests.get("http://localhost:8000/-/routes").json() + assert len(routes) == 0 + else: + assert requests.get(f"http://localhost:8000{ingress_route}").text == "hello" + + class TestAppBuilder: @serve.deployment class A: @@ -758,11 +807,7 @@ def f(): pass with pytest.raises( - ValueError, - match=( - r"Invalid route_prefix 'no_slash', " - "must start with a forward slash \('/'\)" - ), + ValueError, match=r"The route_prefix must start with a forward slash \('/'\)" ): serve.run(f.bind(), route_prefix="no_slash") diff --git a/python/ray/serve/tests/test_cli_2.py b/python/ray/serve/tests/test_cli_2.py index 51e5a75124f6..3d45ea2ea31a 100644 --- a/python/ray/serve/tests/test_cli_2.py +++ b/python/ray/serve/tests/test_cli_2.py @@ -529,8 +529,8 @@ async def __call__(self): TestBuildDagNode = NoArgDriver.bind(TestBuildFNode) -TestApp1Node = global_f.options(name="app1").bind() -TestApp2Node = NoArgDriver.options(name="app2").bind(global_f.bind()) +TestApp1Node = global_f.options(route_prefix="/app1").bind() +TestApp2Node = NoArgDriver.options(route_prefix="/app2").bind(global_f.bind()) @pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") @@ -948,6 +948,39 @@ def test_grpc_proxy_model_composition(ray_start_stop): ping_fruit_stand(channel, app) +@serve.deployment(route_prefix="/foo") +async def deployment_with_route_prefix(args): + return "bar..." + + +route_prefix_app = deployment_with_route_prefix.bind() + + +@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") +def test_serve_run_mount_to_correct_deployment_route_prefix(ray_start_stop): + """Test running serve run with deployment with route_prefix should mount the + deployment to the correct route.""" + + import_path = "ray.serve.tests.test_cli_2.route_prefix_app" + subprocess.Popen(["serve", "run", import_path]) + + # /-/routes should show the app having the correct route. + wait_for_condition( + lambda: requests.get("http://localhost:8000/-/routes").text + == '{"/foo":"default"}' + ) + + # Ping root path directly should 404. + wait_for_condition( + lambda: requests.get("http://localhost:8000/").status_code == 404 + ) + + # Ping the mounting route should return 200. + wait_for_condition( + lambda: requests.get("http://localhost:8000/foo").status_code == 200 + ) + + @pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") def test_control_c_shutdown_serve_components(ray_start_stop): """Test ctrl+c after `serve run` shuts down serve components.""" diff --git a/python/ray/serve/tests/test_deploy.py b/python/ray/serve/tests/test_deploy.py index 9b8a92551fbe..7cde6f9f73a9 100644 --- a/python/ray/serve/tests/test_deploy.py +++ b/python/ray/serve/tests/test_deploy.py @@ -610,6 +610,7 @@ class DClass: num_replicas=2, user_config="hi", max_ongoing_requests=100, + route_prefix="/hello", ray_actor_options={"num_cpus": 2}, )(DClass) @@ -618,12 +619,15 @@ class DClass: assert D.num_replicas == 2 assert D.user_config == "hi" assert D.max_ongoing_requests == 100 + assert D.route_prefix == "/hello" assert D.ray_actor_options == {"num_cpus": 2} D = serve.deployment( version=None, + route_prefix=None, )(DClass) assert D.version is None + assert D.route_prefix is None if __name__ == "__main__": diff --git a/python/ray/serve/tests/test_deploy_2.py b/python/ray/serve/tests/test_deploy_2.py index ba09f37ff70a..fd5cb4f59eb6 100644 --- a/python/ray/serve/tests/test_deploy_2.py +++ b/python/ray/serve/tests/test_deploy_2.py @@ -21,6 +21,25 @@ from ray.util.state import list_actors +@pytest.mark.parametrize("prefixes", [[None, "/f", None], ["/f", None, "/f"]]) +def test_deploy_nullify_route_prefix(serve_instance, prefixes): + # With multi dags support, dag driver will receive all route + # prefix when route_prefix is "None", since "None" will be converted + # to "/" internally. + # Note: the expose http endpoint will still be removed for internal + # dag node by setting "None" to route_prefix + @serve.deployment + def f(*args): + return "got me" + + for prefix in prefixes: + dag = f.options(route_prefix=prefix).bind() + handle = serve.run(dag) + assert requests.get("http://localhost:8000/f").status_code == 200 + assert requests.get("http://localhost:8000/f").text == "got me" + assert handle.remote().result() == "got me" + + @pytest.mark.timeout(10, method="thread") def test_deploy_empty_bundle(serve_instance): @serve.deployment(ray_actor_options={"num_cpus": 0}) diff --git a/python/ray/serve/tests/test_deploy_app.py b/python/ray/serve/tests/test_deploy_app.py index f1a3548faf30..0bea9ea6d25b 100644 --- a/python/ray/serve/tests/test_deploy_app.py +++ b/python/ray/serve/tests/test_deploy_app.py @@ -1115,6 +1115,53 @@ def test_get_app_handle(client: ServeControllerClient): assert handle_2.route.remote("ADD", 2).result() == "5 pizzas please!" +@pytest.mark.parametrize("heavyweight", [True, False]) +def test_deploy_lightweight_multiple_route_prefix( + client: ServeControllerClient, heavyweight: bool +): + """If user deploys a config that sets route prefix for a non-ingress deployment, + the deploy should fail. + """ + + config = { + "applications": [ + { + "name": "default", + "import_path": "ray.serve.tests.test_config_files.world.DagNode", + } + ] + } + client.deploy_apps(ServeDeploySchema(**config)) + + def check(): + assert requests.post("http://localhost:8000/").text == "wonderful world" + return True + + wait_for_condition(check) + + # Add route prefix for non-ingress deployment + config["applications"][0]["deployments"] = [{"name": "f", "route_prefix": "/"}] + if heavyweight: + # Trigger re-build of the application + config["applications"][0]["runtime_env"] = {"env_vars": {"test": "3"}} + client.deploy_apps(ServeDeploySchema(**config)) + + def check_failed(): + s = serve.status().applications["default"] + assert s.status == ApplicationStatus.DEPLOY_FAILED + assert "Found multiple route prefixes" in s.message + return True + + wait_for_condition(check_failed) + + # Check 10 more times to make sure the status doesn't oscillate + for _ in range(10): + s = serve.status().applications["default"] + assert s.status == ApplicationStatus.DEPLOY_FAILED + assert "Found multiple route prefixes" in s.message + time.sleep(0.1) + + @pytest.mark.parametrize("rebuild", [True, False]) def test_redeploy_old_config_after_failed_deployment( client: ServeControllerClient, rebuild @@ -1150,9 +1197,9 @@ def check_application_running(): ] = "ray.serve.tests.test_config_files.import_error.app" err_msg = "ZeroDivisionError" else: - # Set config for a nonexistent deployment - new_app_config["deployments"] = [{"name": "nonexistent", "num_replicas": 1}] - err_msg = "nonexistent deployment 'nonexistent'" + # Trying to add a route prefix for non-ingress deployment will fail + new_app_config["deployments"] = [{"name": "f", "route_prefix": "/"}] + err_msg = "Found multiple route prefixes" client.deploy_apps(ServeDeploySchema(**{"applications": [new_app_config]})) def check_deploy_failed(message): diff --git a/python/ray/serve/tests/test_deployment_graph_ingress.py b/python/ray/serve/tests/test_deployment_graph_ingress.py new file mode 100644 index 000000000000..b98b6edab19d --- /dev/null +++ b/python/ray/serve/tests/test_deployment_graph_ingress.py @@ -0,0 +1,116 @@ +import sys + +import pytest + +from ray import serve +from ray.serve._private.deployment_graph_build import build as pipeline_build +from ray.serve._private.deployment_graph_build import ( + get_and_validate_ingress_deployment, +) + + +@serve.deployment +class Model: + def __init__(self, val): + self.val = val + + def forward(self, input): + return self.val + input + + +@serve.deployment +def func_deployment(): + return "hello" + + +@serve.deployment +def combine(input_1, input_2): + return input_1 + input_2 + + +@serve.deployment +class Driver: + def __init__(self, dag): + self.dag = dag + + async def __call__(self, inp): + return await self.dag.remote(inp) + + +def test_http_user_bring_own_driver_route_prefix(serve_instance): + m1 = Model.bind(1) + serve_dag = Driver.options(route_prefix="/hello").bind(m1) + + deployments = pipeline_build(serve_dag) + ingress_deployment = get_and_validate_ingress_deployment(deployments) + assert ingress_deployment.route_prefix == "/hello" + for deployment in deployments[:-1]: + assert deployment.route_prefix is None + + +def test_http_no_non_ingress_deployment_route_prefix(serve_instance): + m1 = Model.options(route_prefix="/should-fail").bind(1) + serve_dag = Driver.bind(m1) + + with pytest.raises( + ValueError, + match="Route prefix is only configurable on the ingress deployment", + ): + _ = pipeline_build(serve_dag) + + +def test_http_we_provide_default_route_prefix_cls(serve_instance): + """Ensure the default ingress deployment route is '/' instead of driver + class name + """ + + m1 = Model.bind(1) + serve_dag = Driver.bind(m1) + + deployments = pipeline_build(serve_dag) + ingress_deployment = get_and_validate_ingress_deployment(deployments) + assert ingress_deployment.route_prefix == "/" + for deployment in deployments[:-1]: + assert deployment.route_prefix is None + + +def test_http_we_provide_default_route_prefix_func(serve_instance): + """Ensure the default ingress deployment route is '/' instead of driver + function name + """ + func_dag = func_deployment.bind() + deployments = pipeline_build(func_dag) + ingress_deployment = get_and_validate_ingress_deployment(deployments) + assert ingress_deployment.route_prefix == "/" + + +def test_http_non_default_route_prefix_on_non_root_node(serve_instance): + m1 = Model.bind(1) + serve_dag = Driver.bind(m1) + + deployments = pipeline_build(serve_dag) + non_root_deployment = deployments[0].options(route_prefix="/") + deployments[0] = non_root_deployment + + with pytest.raises( + ValueError, + match=( + "Only one deployment in an Serve Application or DAG can have " + "non-None route prefix" + ), + ): + _ = get_and_validate_ingress_deployment(deployments) + + +def test_http_reconfigure_non_default_route_prefix_on_root(serve_instance): + m1 = Model.bind(1) + serve_dag = Driver.bind(m1) + + deployments = pipeline_build(serve_dag) + non_root_deployment = deployments[-1].options(route_prefix="/yoo") + deployments[-1] = non_root_deployment + _ = get_and_validate_ingress_deployment(deployments) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/test_deployment_node.py b/python/ray/serve/tests/test_deployment_node.py new file mode 100644 index 000000000000..dbd17ed24bb2 --- /dev/null +++ b/python/ray/serve/tests/test_deployment_node.py @@ -0,0 +1,82 @@ +import pytest + +import ray +from ray import serve +from ray.serve._private.deployment_node import DeploymentNode + + +@serve.deployment +class ServeActor: + def __init__(self, init_value=0): + self.i = init_value + + def inc(self): + self.i += 1 + + def get(self): + return self.i + + +class SyncActor: + def __init__(self, init_value=0): + self.i = init_value + + def inc(self): + self.i += 1 + + def get(self): + return self.i + + +class Actor: + def __init__(self, init_value=0): + self.i = init_value + + async def inc(self): + self.i += 1 + + async def get(self): + return self.i + + +@pytest.mark.skip(reason="async handle not enabled yet") +async def test_simple_deployment_async(serve_instance): + """Internal testing only for simple creation and execution. + + User should NOT directly create instances of Deployment or DeploymentNode. + """ + node = DeploymentNode( + Actor, + "test", + (10,), + {}, + {}, + ) + node._deployment._deploy() + handle = node._deployment_handle + + assert ray.get(await node.get.execute()) == 10 + ray.get(await node.inc.execute()) + assert ray.get(await node.get.execute()) == 11 + assert ray.get(await node.get.execute()) == ray.get(await handle.get.remote()) + + +def test_mix_sync_async_handle(serve_instance): + # TODO: (jiaodong) Add complex multi-deployment tests from ray DAG. + pass + + +def test_deployment_node_as_init_args(serve_instance): + # TODO: (jiaodong) Add complex multi-deployment tests from ray DAG. + pass + + +def test_multi_deployment_nodes(serve_instance): + # TODO: (jiaodong) Add complex multi-deployment tests from ray DAG. + pass + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/serve/tests/test_fastapi.py b/python/ray/serve/tests/test_fastapi.py index 6c4e0b7ab4c8..57ec2f20eeea 100644 --- a/python/ray/serve/tests/test_fastapi.py +++ b/python/ray/serve/tests/test_fastapi.py @@ -30,6 +30,7 @@ from ray.serve._private.client import ServeControllerClient from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME from ray.serve._private.http_util import make_fastapi_class_based_view +from ray.serve._private.utils import DEFAULT from ray.serve.exceptions import RayServeException from ray.serve.handle import DeploymentHandle @@ -439,7 +440,7 @@ class MyApp: @pytest.mark.parametrize( "input_route_prefix,expected_route_prefix", - [("/", "/"), ("/subpath", "/subpath/")], + [(DEFAULT.VALUE, "/"), ("/", "/"), ("/subpath", "/subpath/")], ) def test_doc_generation(serve_instance, input_route_prefix, expected_route_prefix): app = FastAPI() diff --git a/python/ray/serve/tests/test_http_routes.py b/python/ray/serve/tests/test_http_routes.py index 4d7cf3e153a7..602b9ef006e0 100644 --- a/python/ray/serve/tests/test_http_routes.py +++ b/python/ray/serve/tests/test_http_routes.py @@ -11,45 +11,38 @@ def test_path_validation(serve_instance): - @serve.deployment - class D: - pass - # Path prefix must start with /. with pytest.raises(ValueError): - serve.run(D.bind(), route_prefix="hello") + + @serve.deployment(route_prefix="hello") + class D1: + pass # Path prefix must not end with / unless it's the root. with pytest.raises(ValueError): - serve.run(D.bind(), route_prefix="/hello/") + + @serve.deployment(route_prefix="/hello/") + class D2: + pass # Wildcards not allowed with new ingress support. with pytest.raises(ValueError): - serve.run(D.bind(), route_prefix="/{hello}") + @serve.deployment(route_prefix="/{hello}") + class D3: + pass -def test_routes_healthz(serve_instance): - # Should return 503 until there are any routes populated. - resp = requests.get("http://localhost:8000/-/healthz") - assert resp.status_code == 503 - assert resp.text == "Route table is not populated yet." + @serve.deployment(route_prefix="/duplicate") + class D4: + pass - @serve.deployment - class D1: - def __call__(self, *args): - return "hi" + serve.run(D4.bind()) - # D1 not exposed over HTTP so should still return 503. - serve.run(D1.bind(), route_prefix=None) - resp = requests.get("http://localhost:8000/-/healthz") - assert resp.status_code == 503 - assert resp.text == "Route table is not populated yet." - # D1 exposed over HTTP, should return 200 OK. - serve.run(D1.bind(), route_prefix="/") +def test_routes_healthz(serve_instance): resp = requests.get("http://localhost:8000/-/healthz") assert resp.status_code == 200 - assert resp.text == "success" + assert resp.content == b"success" def test_routes_endpoint(serve_instance): @@ -79,7 +72,7 @@ def __call__(self, *args): def test_deployment_without_route(serve_instance): - @serve.deployment + @serve.deployment(route_prefix=None) class D: def __call__(self, *args): return "1" @@ -138,7 +131,7 @@ def __call__(self, *args): check_req("/", text="2") check_req("/a", text="2") - @serve.deployment + @serve.deployment(route_prefix="/hello/world") class D3: def __call__(self, *args): return "3" diff --git a/python/ray/serve/tests/test_standalone.py b/python/ray/serve/tests/test_standalone.py index bb76cb36c31f..0936432dc3aa 100644 --- a/python/ray/serve/tests/test_standalone.py +++ b/python/ray/serve/tests/test_standalone.py @@ -27,6 +27,7 @@ SERVE_DEFAULT_APP_NAME, SERVE_NAMESPACE, SERVE_PROXY_NAME, + SERVE_ROOT_URL_ENV_KEY, ) from ray.serve._private.default_impl import create_cluster_node_info_cache from ray.serve._private.http_util import set_socket_reuse_port @@ -417,6 +418,40 @@ class Dummy: assert resp.headers["access-control-allow-origin"] == "*" +@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows") +def test_http_root_url(ray_shutdown): + @serve.deployment + def f(_): + pass + + root_url = "https://my.domain.dev/prefix" + + port = new_port() + os.environ[SERVE_ROOT_URL_ENV_KEY] = root_url + serve.start(http_options=dict(port=port)) + serve.run(f.bind()) + assert f.url == root_url + "/f" + serve.shutdown() + ray.shutdown() + del os.environ[SERVE_ROOT_URL_ENV_KEY] + + port = new_port() + serve.start(http_options=dict(port=port)) + serve.run(f.bind()) + assert f.url != root_url + "/f" + assert f.url == f"http://127.0.0.1:{port}/f" + serve.shutdown() + ray.shutdown() + + ray.init(runtime_env={"env_vars": {SERVE_ROOT_URL_ENV_KEY: root_url}}) + port = new_port() + serve.start(http_options=dict(port=port)) + serve.run(f.bind()) + assert f.url == root_url + "/f" + serve.shutdown() + ray.shutdown() + + @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows") def test_http_root_path(ray_shutdown): @serve.deployment @@ -428,8 +463,11 @@ def hello(): serve.start(http_options=dict(root_path=root_path, port=port)) serve.run(hello.bind(), route_prefix="/hello") + # check whether url is prefixed correctly + assert hello.url == f"http://127.0.0.1:{port}{root_path}/hello" + # check routing works as expected - resp = requests.get(f"http://127.0.0.1:{port}{root_path}/hello") + resp = requests.get(hello.url) assert resp.status_code == 200 assert resp.text == "hello" @@ -498,7 +536,7 @@ def test_http_head_only(ray_cluster): cpu_per_nodes = { r["CPU"] for r in ray._private.state.available_resources_per_node().values() } - assert cpu_per_nodes == {4} + assert cpu_per_nodes == {4, 4} def test_serve_shutdown(ray_shutdown): diff --git a/python/ray/serve/tests/test_websockets.py b/python/ray/serve/tests/test_websockets.py index c4d7cca7b262..05a2da556a81 100644 --- a/python/ray/serve/tests/test_websockets.py +++ b/python/ray/serve/tests/test_websockets.py @@ -28,7 +28,10 @@ async def ws_handler(self, ws: WebSocket): bytes = await ws.receive_bytes() await ws.send_bytes(bytes) - serve.run(WebSocketServer.bind(), route_prefix=route_prefix or "/") + if route_prefix is not None: + WebSocketServer = WebSocketServer.options(route_prefix=route_prefix) + + serve.run(WebSocketServer.bind()) msg = "Hello world!" if route_prefix: diff --git a/python/ray/serve/tests/unit/test_application_state.py b/python/ray/serve/tests/unit/test_application_state.py index 108ce319a9e4..b0c96f278757 100644 --- a/python/ray/serve/tests/unit/test_application_state.py +++ b/python/ray/serve/tests/unit/test_application_state.py @@ -1141,7 +1141,19 @@ def test_override_autoscaling_config(self, info): assert updated_info.deployment_config.autoscaling_config.initial_replicas == 12 assert updated_info.deployment_config.autoscaling_config.max_replicas == 79 - def test_override_route_prefix(self, info): + def test_override_route_prefix_1(self, info): + config = ServeApplicationSchema( + name="default", + import_path="test.import.path", + deployments=[DeploymentSchema(name="A", route_prefix="/alice")], + ) + + updated_infos = override_deployment_info({"A": info}, config) + updated_info = updated_infos["A"] + assert updated_info.route_prefix == "/alice" + assert updated_info.version == "123" + + def test_override_route_prefix_2(self, info): config = ServeApplicationSchema( name="default", import_path="test.import.path", @@ -1158,6 +1170,19 @@ def test_override_route_prefix(self, info): assert updated_info.route_prefix == "/bob" assert updated_info.version == "123" + def test_override_route_prefix_3(self, info): + config = ServeApplicationSchema( + name="default", + import_path="test.import.path", + route_prefix="/bob", + deployments=[DeploymentSchema(name="A", route_prefix="/alice")], + ) + + updated_infos = override_deployment_info({"A": info}, config) + updated_info = updated_infos["A"] + assert updated_info.route_prefix == "/bob" + assert updated_info.version == "123" + def test_override_ray_actor_options_1(self, info): """Test runtime env specified in config at deployment level.""" config = ServeApplicationSchema( diff --git a/python/ray/serve/tests/unit/test_deployment_class.py b/python/ray/serve/tests/unit/test_deployment_class.py index 75743f6184ac..54dd23d0b9f2 100644 --- a/python/ray/serve/tests/unit/test_deployment_class.py +++ b/python/ray/serve/tests/unit/test_deployment_class.py @@ -81,6 +81,7 @@ class TestDeploymentOptions: "name": "test", "version": "abcd", "num_replicas": 1, + "route_prefix": "/", "ray_actor_options": {}, "user_config": {}, "max_ongoing_requests": 10, @@ -177,6 +178,7 @@ def f(): "option", [ "num_replicas", + "route_prefix", "autoscaling_config", "user_config", ], diff --git a/python/ray/serve/tests/unit/test_schema.py b/python/ray/serve/tests/unit/test_schema.py index c7bc18788c7d..1959cf20bb98 100644 --- a/python/ray/serve/tests/unit/test_schema.py +++ b/python/ray/serve/tests/unit/test_schema.py @@ -288,6 +288,39 @@ def test_validate_max_queued_requests(self): with pytest.raises(ValidationError): DeploymentSchema.parse_obj(deployment_schema) + def test_route_prefix(self): + # Ensure that route_prefix is validated + + deployment_schema = self.get_minimal_deployment_schema() + + # route_prefix must start with a "/" + deployment_schema["route_prefix"] = "hello/world" + with pytest.raises(ValueError): + DeploymentSchema.parse_obj(deployment_schema) + + # route_prefix must end with a "/" + deployment_schema["route_prefix"] = "/hello/world/" + with pytest.raises(ValueError): + DeploymentSchema.parse_obj(deployment_schema) + + # route_prefix cannot contain wildcards, meaning it can't have + # "{" or "}" + deployment_schema["route_prefix"] = "/hello/{adjective}/world/" + with pytest.raises(ValueError): + DeploymentSchema.parse_obj(deployment_schema) + + # Ensure a valid route_prefix works + deployment_schema["route_prefix"] = "/hello/wonderful/world" + DeploymentSchema.parse_obj(deployment_schema) + + # Ensure route_prefix of "/" works + deployment_schema["route_prefix"] = "/" + DeploymentSchema.parse_obj(deployment_schema) + + # Ensure route_prefix of None works + deployment_schema["route_prefix"] = None + DeploymentSchema.parse_obj(deployment_schema) + def test_mutually_exclusive_num_replicas_and_autoscaling_config(self): # num_replicas and autoscaling_config cannot be set at the same time deployment_schema = self.get_minimal_deployment_schema() @@ -724,6 +757,7 @@ def global_f(): def test_deployment_to_schema_to_deployment(): @serve.deployment( num_replicas=3, + route_prefix="/hello", ray_actor_options={ "runtime_env": { "working_dir": TEST_MODULE_PINNED_URI, @@ -744,6 +778,7 @@ def f(): ) assert deployment.num_replicas == 3 + assert deployment.route_prefix == "/hello" assert ( deployment.ray_actor_options["runtime_env"]["working_dir"] == TEST_MODULE_PINNED_URI @@ -759,6 +794,7 @@ def test_unset_fields_schema_to_deployment_ray_actor_options(): @serve.deployment( num_replicas=3, + route_prefix="/hello", ray_actor_options={}, ) def f(): diff --git a/release/golden_notebook_tests/workloads/torch_tune_serve_test.py b/release/golden_notebook_tests/workloads/torch_tune_serve_test.py index f6add0c302df..bace953e5ba6 100644 --- a/release/golden_notebook_tests/workloads/torch_tune_serve_test.py +++ b/release/golden_notebook_tests/workloads/torch_tune_serve_test.py @@ -151,7 +151,7 @@ def get_model(checkpoint_dir: str): return model -@serve.deployment(name="mnist") +@serve.deployment(name="mnist", route_prefix="/mnist") class MnistDeployment: def __init__(self, model): use_cuda = torch.cuda.is_available() @@ -184,8 +184,7 @@ def setup_serve(model, use_gpu: bool = False): ray_actor_options={"num_gpus": 1, "resources": {"worker": 1}} if use_gpu else {}, - ).bind(model), - route_prefix="/mnist", + ).bind(model) )