Skip to content

Commit

Permalink
Add HTTP API to scheduler (#6270)
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt711 authored May 17, 2022
1 parent af3b93e commit 63cdddd
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 0 deletions.
1 change: 1 addition & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ distributed:
- distributed.http.scheduler.prometheus
- distributed.http.scheduler.info
- distributed.http.scheduler.json
- distributed.http.scheduler.api
- distributed.http.health
- distributed.http.proxy
- distributed.http.statics
Expand Down
71 changes: 71 additions & 0 deletions distributed/http/scheduler/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from __future__ import annotations

import json

from distributed.http.utils import RequestHandler


class APIHandler(RequestHandler):
def get(self):
self.write("API V1")
self.set_header("Content-Type", "text/plain")


class RetireWorkersHandler(RequestHandler):
async def post(self):
self.set_header("Content-Type", "text/json")
scheduler = self.server
try:
params = json.loads(self.request.body)
n_workers = params.get("n", 0)
if n_workers:
workers = scheduler.workers_to_close(n=n_workers)
workers_info = await scheduler.retire_workers(workers=workers)
else:
workers = params.get("workers", {})
workers_info = await scheduler.retire_workers(workers=workers)
self.write(json.dumps(workers_info))
except Exception as e:
self.set_status(500, str(e))
self.write(json.dumps({"Error": "Internal Server Error"}))


class GetWorkersHandler(RequestHandler):
def get(self):
self.set_header("Content-Type", "text/json")
scheduler = self.server
try:
response = {
"num_workers": len(scheduler.workers),
"workers": [
{"name": ws.name, "address": ws.address}
for ws in scheduler.workers.values()
],
}
self.write(json.dumps(response))
except Exception as e:
self.set_status(500, str(e))
self.write(json.dumps({"Error": "Internal Server Error"}))


class AdaptiveTargetHandler(RequestHandler):
def get(self):
self.set_header("Content-Type", "text/json")
scheduler = self.server
try:
desired_workers = scheduler.adaptive_target()
response = {
"workers": desired_workers,
}
self.write(json.dumps(response))
except Exception as e:
self.set_status(500, str(e))
self.write(json.dumps({"Error": "Internal Server Error"}))


routes: list[tuple] = [
("/api/v1", APIHandler, {}),
("/api/v1/retire_workers", RetireWorkersHandler, {}),
("/api/v1/get_workers", GetWorkersHandler, {}),
("/api/v1/adaptive_target", AdaptiveTargetHandler, {}),
]
51 changes: 51 additions & 0 deletions distributed/http/scheduler/tests/test_scheduler_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import re

import aiohttp
import pytest

pytest.importorskip("bokeh")
Expand Down Expand Up @@ -245,3 +246,53 @@ async def test_eventstream(c, s, a, b):
)
assert "websocket" in str(s.plugins).lower()
ws_client.close()


@gen_cluster(client=True, clean_kwargs={"threads": False})
async def test_api(c, s, a, b):
async with aiohttp.ClientSession() as session:
async with session.get(
"http://localhost:%d/api/v1" % s.http_server.port
) as resp:
assert resp.status == 200
assert resp.headers["Content-Type"] == "text/plain"
assert (await resp.text()) == "API V1"


@gen_cluster(client=True, clean_kwargs={"threads": False})
async def test_retire_workers(c, s, a, b):
async with aiohttp.ClientSession() as session:
params = {"workers": [a.address, b.address]}
async with session.post(
"http://localhost:%d/api/v1/retire_workers" % s.http_server.port,
json=params,
) as resp:
assert resp.status == 200
assert resp.headers["Content-Type"] == "text/json"
retired_workers_info = json.loads(await resp.text())
assert len(retired_workers_info) == 2


@gen_cluster(client=True, clean_kwargs={"threads": False})
async def test_get_workers(c, s, a, b):
async with aiohttp.ClientSession() as session:
async with session.get(
"http://localhost:%d/api/v1/get_workers" % s.http_server.port
) as resp:
assert resp.status == 200
assert resp.headers["Content-Type"] == "text/json"
workers_info = json.loads(await resp.text())["workers"]
workers_address = [worker.get("address") for worker in workers_info]
assert set(workers_address) == {a.address, b.address}


@gen_cluster(client=True, clean_kwargs={"threads": False})
async def test_adaptive_target(c, s, a, b):
async with aiohttp.ClientSession() as session:
async with session.get(
"http://localhost:%d/api/v1/adaptive_target" % s.http_server.port
) as resp:
assert resp.status == 200
assert resp.headers["Content-Type"] == "text/json"
num_workers = json.loads(await resp.text())["workers"]
assert num_workers == 0
16 changes: 16 additions & 0 deletions docs/source/http_services.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ Pages and JSON endpoints served by the scheduler
- ``/statics/()``: static file content (CSS, etc)
- ``/stealing``: worker occupancy metrics, to evaluate task stealing

Scheduler API
-------------

Scheduler methods exposed by the API with an example of the request body they take

- ``/api/v1/retire_workers`` : retire certain workers on the scheduler

.. code-block:: json
{
"workers":["tcp://127.0.0.1:53741", "tcp://127.0.0.1:53669"]
}
- ``/api/v1/get_workers`` : get all workers on the scheduler
- ``/api/v1/adaptive_target`` : get the target number of workers based on the scheduler's load

Individual bokeh plots
----------------------

Expand Down

0 comments on commit 63cdddd

Please sign in to comment.