Skip to content

Commit

Permalink
feat(learning): Integrate GLTorch with GraphScope in Server-Client Mo…
Browse files Browse the repository at this point in the history
…de with K8S Deployment Supports (#3624)


Co-authored-by: Hongyi ZHANG <50618951+Zhanghyi@users.noreply.github.com>
Co-authored-by: Jia Zhibin <56682441+Jia-zb@users.noreply.github.com>
  • Loading branch information
3 people committed Apr 2, 2024
1 parent 2904c64 commit a7ec218
Show file tree
Hide file tree
Showing 27 changed files with 1,798 additions and 95 deletions.
88 changes: 70 additions & 18 deletions coordinator/gscoordinator/cluster_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@

from gscoordinator.constants import ANALYTICAL_CONTAINER_NAME
from gscoordinator.constants import DATASET_CONTAINER_NAME
from gscoordinator.constants import GRAPHLEARN_CONTAINER_NAME
from gscoordinator.constants import GRAPHLEARN_TORCH_CONTAINER_NAME
from gscoordinator.constants import INTERACTIVE_EXECUTOR_CONTAINER_NAME
from gscoordinator.constants import INTERACTIVE_FRONTEND_CONTAINER_NAME
from gscoordinator.constants import LEARNING_CONTAINER_NAME
from gscoordinator.utils import parse_as_glog_level
from gscoordinator.version import __version__

Expand Down Expand Up @@ -63,7 +64,8 @@ def __init__(
self,
config: Config,
engine_pod_prefix,
learning_start_port,
graphlearn_start_port,
graphlearn_torch_start_port,
):
self._instance_id = config.session.instance_id
self._glog_level = parse_as_glog_level(config.log_level)
Expand All @@ -81,7 +83,8 @@ def __init__(
self._with_analytical = launcher_config.engine.enable_gae
self._with_analytical_java = launcher_config.engine.enable_gae_java
self._with_interactive = launcher_config.engine.enable_gie
self._with_learning = launcher_config.engine.enable_gle
self._with_graphlearn = launcher_config.engine.enable_gle
self._with_graphlearn_torch = launcher_config.engine.enable_glt
self._with_mars = launcher_config.mars.enable

def load_base64_json(string):
Expand All @@ -108,7 +111,8 @@ def load_base64_json(string):
self._analytical_java_image = f"{image_prefix}/analytical-java:{tag}"
self._interactive_frontend_image = f"{image_prefix}/interactive-frontend:{tag}"
self._interactive_executor_image = f"{image_prefix}/interactive-executor:{tag}"
self._learning_image = f"{image_prefix}/learning:{tag}"
self._graphlearn_image = f"{image_prefix}/graphlearn:{tag}"
self._graphlearn_torch_image = f"{image_prefix}/graphlearn-torch:{tag}"
self._dataset_image = f"{image_prefix}/dataset:{tag}"

self._vineyard_deployment = config.vineyard.deployment_name
Expand All @@ -121,12 +125,14 @@ def load_base64_json(string):
self._engine_pod_prefix = engine_pod_prefix
self._analytical_prefix = "gs-analytical-"
self._interactive_frontend_prefix = "gs-interactive-frontend-"
self._learning_prefix = "gs-learning-"
self._graphlearn_prefix = "gs-graphlearn-"
self._graphlearn_torch_prefix = "gs-graphlearn-torch-"
self._vineyard_prefix = "vineyard-"
self._mars_scheduler_name_prefix = "mars-scheduler-"
self._mars_service_name_prefix = "mars-"

self._learning_start_port = learning_start_port
self._graphlearn_start_port = graphlearn_start_port
self._graphlearn_torch_start_port = graphlearn_torch_start_port

self._engine_labels = {
"app.kubernetes.io/name": "graphscope",
Expand Down Expand Up @@ -255,17 +261,36 @@ def get_interactive_executor_container(self, volume_mounts):
)
return container

def get_learning_container(self, volume_mounts):
name = LEARNING_CONTAINER_NAME
image = self._learning_image
def get_graphlearn_container(self, volume_mounts):
name = GRAPHLEARN_CONTAINER_NAME
image = self._graphlearn_image
args = ["tail", "-f", "/dev/null"]
resource = self._engine_resources.gle_resource
container = self.get_engine_container_helper(
name, image, args, volume_mounts, resource
)
container.ports = [
kube_client.V1ContainerPort(container_port=p)
for p in range(self._learning_start_port, self._learning_start_port + 1000)
for p in range(
self._graphlearn_start_port, self._graphlearn_start_port + 1000
)
]
return container

def get_graphlearn_torch_container(self, volume_mounts):
name = GRAPHLEARN_TORCH_CONTAINER_NAME
image = self._graphlearn_torch_image
args = ["tail", "-f", "/dev/null"]
resource = self._engine_resources.glt_resource
container = self.get_engine_container_helper(
name, image, args, volume_mounts, resource
)
container.ports = [
kube_client.V1ContainerPort(container_port=p)
for p in range(
self._graphlearn_torch_start_port,
self._graphlearn_torch_start_port + 1000,
)
]
return container

Expand Down Expand Up @@ -338,9 +363,13 @@ def get_engine_pod_spec(self):
volume_mounts=engine_volume_mounts
)
)
if self._with_learning:
if self._with_graphlearn:
containers.append(
self.get_learning_container(volume_mounts=engine_volume_mounts)
self.get_graphlearn_container(volume_mounts=engine_volume_mounts)
)
if self._with_graphlearn_torch:
containers.append(
self.get_graphlearn_torch_container(volume_mounts=engine_volume_mounts)
)

if self._with_dataset:
Expand Down Expand Up @@ -397,10 +426,10 @@ def get_engine_headless_service(self):
)
return service

def get_learning_service(self, object_id, start_port):
def get_graphlearn_service(self, object_id, start_port):
service_type = self._service_type
num_workers = self._num_workers
name = self.get_learning_service_name(object_id)
name = self.get_graphlearn_service_name(object_id)
ports = []
for i in range(start_port, start_port + num_workers):
port = kube_client.V1ServicePort(name=f"{name}-{i}", port=i, protocol="TCP")
Expand All @@ -413,10 +442,30 @@ def get_learning_service(self, object_id, start_port):
)
return service

def get_learning_ports(self, start_port):
def get_graphlearn_torch_service(self, object_id, start_port):
service_type = self._service_type
num_workers = self._num_workers
name = self.get_graphlearn_torch_service_name(object_id)
ports = []
for i in range(start_port, start_port + num_workers):
port = kube_client.V1ServicePort(name=f"{name}-{i}", port=i, protocol="TCP")
ports.append(port)
service_spec = ResourceBuilder.get_service_spec(
service_type, ports, self._engine_labels, "Local"
)
service = ResourceBuilder.get_service(
self._namespace, name, service_spec, self._engine_labels
)
return service

def get_graphlearn_ports(self, start_port):
num_workers = self._num_workers
return [i for i in range(start_port, start_port + num_workers)]

def get_graphlearn_torch_ports(self, start_port):
num_loaders = 4
return [i for i in range(start_port, start_port + num_loaders)]

@property
def engine_stateful_set_name(self):
return f"{self._engine_pod_prefix}{self._instance_id}"
Expand Down Expand Up @@ -444,11 +493,14 @@ def get_vineyard_service_endpoint(self, api_client):
assert len(endpoints) > 0
return endpoints[0]

def get_learning_service_name(self, object_id):
return f"{self._learning_prefix}{object_id}"
def get_graphlearn_service_name(self, object_id):
return f"{self._graphlearn_prefix}{object_id}"

def get_graphlearn_torch_service_name(self, object_id):
return f"{self._graphlearn_torch_prefix}{object_id}"

def get_graphlearn_service_endpoint(self, api_client, object_id, pod_host_ip_list):
service_name = self.get_learning_service_name(object_id)
service_name = self.get_graphlearn_service_name(object_id)
service_type = self._service_type
core_api = kube_client.CoreV1Api(api_client)
if service_type == "NodePort":
Expand Down
3 changes: 2 additions & 1 deletion coordinator/gscoordinator/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
ANALYTICAL_CONTAINER_NAME = "engine"
INTERACTIVE_FRONTEND_CONTAINER_NAME = "frontend"
INTERACTIVE_EXECUTOR_CONTAINER_NAME = "executor"
LEARNING_CONTAINER_NAME = "learning"
GRAPHLEARN_CONTAINER_NAME = "graphlearn"
GRAPHLEARN_TORCH_CONTAINER_NAME = "graphlearn-torch"
DATASET_CONTAINER_NAME = "dataset"
MARS_CONTAINER_NAME = "mars"
VINEYARD_CONTAINER_NAME = "vineyard"
Loading

0 comments on commit a7ec218

Please sign in to comment.