From a7ec2184736e7654a4a194dd4de9904f0229c673 Mon Sep 17 00:00:00 2001 From: husimplicity <36654893+husimplicity@users.noreply.github.com> Date: Tue, 2 Apr 2024 10:47:07 +0800 Subject: [PATCH] feat(learning): Integrate GLTorch with GraphScope in Server-Client Mode with K8S Deployment Supports (#3624) Co-authored-by: Hongyi ZHANG <50618951+Zhanghyi@users.noreply.github.com> Co-authored-by: Jia Zhibin <56682441+Jia-zb@users.noreply.github.com> --- coordinator/gscoordinator/cluster_builder.py | 88 +++- coordinator/gscoordinator/constants.py | 3 +- .../gscoordinator/kubernetes_launcher.py | 347 ++++++++++++-- .../gscoordinator/launch_graphlearn_torch.py | 55 ++- coordinator/gscoordinator/launcher.py | 2 +- coordinator/gscoordinator/local_launcher.py | 11 +- coordinator/gscoordinator/object_manager.py | 7 +- .../gscoordinator/operator_launcher.py | 2 +- .../servicer/graphscope_one/service.py | 15 +- coordinator/gscoordinator/utils.py | 12 + docs/learning_engine/guide_and_examples.md | 12 + .../tutorial_node_classification_pyg_k8s.md | 425 ++++++++++++++++++ learning_engine/graphlearn-for-pytorch | 2 +- python/graphscope/client/session.py | 25 +- python/graphscope/config.py | 15 +- .../graphscope/deploy/kubernetes/cluster.py | 4 +- .../learning/gl_torch_examples/README.md | 23 + .../learning/gl_torch_examples/k8s/README.md | 23 + .../gl_torch_examples/k8s/__init__.py | 14 + .../learning/gl_torch_examples/k8s/client.py | 293 ++++++++++++ .../gl_torch_examples/k8s/client.yaml | 68 +++ .../gl_torch_examples/k8s/k8s_launch.py | 61 +++ .../learning/gl_torch_examples/local.py | 15 + .../gl_torch_examples/local_sc_ddp.py | 306 +++++++++++++ python/graphscope/learning/gl_torch_graph.py | 19 + python/graphscope/learning/utils.py | 42 ++ python/setup.py | 4 +- 27 files changed, 1798 insertions(+), 95 deletions(-) create mode 100644 docs/learning_engine/tutorial_node_classification_pyg_k8s.md create mode 100644 python/graphscope/learning/gl_torch_examples/README.md create mode 100644 python/graphscope/learning/gl_torch_examples/k8s/README.md create mode 100644 python/graphscope/learning/gl_torch_examples/k8s/__init__.py create mode 100755 python/graphscope/learning/gl_torch_examples/k8s/client.py create mode 100644 python/graphscope/learning/gl_torch_examples/k8s/client.yaml create mode 100644 python/graphscope/learning/gl_torch_examples/k8s/k8s_launch.py create mode 100644 python/graphscope/learning/gl_torch_examples/local_sc_ddp.py create mode 100644 python/graphscope/learning/utils.py diff --git a/coordinator/gscoordinator/cluster_builder.py b/coordinator/gscoordinator/cluster_builder.py index 3553a697b8b2..2fe6cff6acb0 100644 --- a/coordinator/gscoordinator/cluster_builder.py +++ b/coordinator/gscoordinator/cluster_builder.py @@ -33,9 +33,10 @@ from gscoordinator.constants import ANALYTICAL_CONTAINER_NAME from gscoordinator.constants import DATASET_CONTAINER_NAME +from gscoordinator.constants import GRAPHLEARN_CONTAINER_NAME +from gscoordinator.constants import GRAPHLEARN_TORCH_CONTAINER_NAME from gscoordinator.constants import INTERACTIVE_EXECUTOR_CONTAINER_NAME from gscoordinator.constants import INTERACTIVE_FRONTEND_CONTAINER_NAME -from gscoordinator.constants import LEARNING_CONTAINER_NAME from gscoordinator.utils import parse_as_glog_level from gscoordinator.version import __version__ @@ -63,7 +64,8 @@ def __init__( self, config: Config, engine_pod_prefix, - learning_start_port, + graphlearn_start_port, + graphlearn_torch_start_port, ): self._instance_id = config.session.instance_id self._glog_level = parse_as_glog_level(config.log_level) @@ -81,7 +83,8 @@ def __init__( self._with_analytical = launcher_config.engine.enable_gae self._with_analytical_java = launcher_config.engine.enable_gae_java self._with_interactive = launcher_config.engine.enable_gie - self._with_learning = launcher_config.engine.enable_gle + self._with_graphlearn = launcher_config.engine.enable_gle + self._with_graphlearn_torch = launcher_config.engine.enable_glt self._with_mars = launcher_config.mars.enable def load_base64_json(string): @@ -108,7 +111,8 @@ def load_base64_json(string): self._analytical_java_image = f"{image_prefix}/analytical-java:{tag}" self._interactive_frontend_image = f"{image_prefix}/interactive-frontend:{tag}" self._interactive_executor_image = f"{image_prefix}/interactive-executor:{tag}" - self._learning_image = f"{image_prefix}/learning:{tag}" + self._graphlearn_image = f"{image_prefix}/graphlearn:{tag}" + self._graphlearn_torch_image = f"{image_prefix}/graphlearn-torch:{tag}" self._dataset_image = f"{image_prefix}/dataset:{tag}" self._vineyard_deployment = config.vineyard.deployment_name @@ -121,12 +125,14 @@ def load_base64_json(string): self._engine_pod_prefix = engine_pod_prefix self._analytical_prefix = "gs-analytical-" self._interactive_frontend_prefix = "gs-interactive-frontend-" - self._learning_prefix = "gs-learning-" + self._graphlearn_prefix = "gs-graphlearn-" + self._graphlearn_torch_prefix = "gs-graphlearn-torch-" self._vineyard_prefix = "vineyard-" self._mars_scheduler_name_prefix = "mars-scheduler-" self._mars_service_name_prefix = "mars-" - self._learning_start_port = learning_start_port + self._graphlearn_start_port = graphlearn_start_port + self._graphlearn_torch_start_port = graphlearn_torch_start_port self._engine_labels = { "app.kubernetes.io/name": "graphscope", @@ -255,9 +261,9 @@ def get_interactive_executor_container(self, volume_mounts): ) return container - def get_learning_container(self, volume_mounts): - name = LEARNING_CONTAINER_NAME - image = self._learning_image + def get_graphlearn_container(self, volume_mounts): + name = GRAPHLEARN_CONTAINER_NAME + image = self._graphlearn_image args = ["tail", "-f", "/dev/null"] resource = self._engine_resources.gle_resource container = self.get_engine_container_helper( @@ -265,7 +271,26 @@ def get_learning_container(self, volume_mounts): ) container.ports = [ kube_client.V1ContainerPort(container_port=p) - for p in range(self._learning_start_port, self._learning_start_port + 1000) + for p in range( + self._graphlearn_start_port, self._graphlearn_start_port + 1000 + ) + ] + return container + + def get_graphlearn_torch_container(self, volume_mounts): + name = GRAPHLEARN_TORCH_CONTAINER_NAME + image = self._graphlearn_torch_image + args = ["tail", "-f", "/dev/null"] + resource = self._engine_resources.glt_resource + container = self.get_engine_container_helper( + name, image, args, volume_mounts, resource + ) + container.ports = [ + kube_client.V1ContainerPort(container_port=p) + for p in range( + self._graphlearn_torch_start_port, + self._graphlearn_torch_start_port + 1000, + ) ] return container @@ -338,9 +363,13 @@ def get_engine_pod_spec(self): volume_mounts=engine_volume_mounts ) ) - if self._with_learning: + if self._with_graphlearn: containers.append( - self.get_learning_container(volume_mounts=engine_volume_mounts) + self.get_graphlearn_container(volume_mounts=engine_volume_mounts) + ) + if self._with_graphlearn_torch: + containers.append( + self.get_graphlearn_torch_container(volume_mounts=engine_volume_mounts) ) if self._with_dataset: @@ -397,10 +426,10 @@ def get_engine_headless_service(self): ) return service - def get_learning_service(self, object_id, start_port): + def get_graphlearn_service(self, object_id, start_port): service_type = self._service_type num_workers = self._num_workers - name = self.get_learning_service_name(object_id) + name = self.get_graphlearn_service_name(object_id) ports = [] for i in range(start_port, start_port + num_workers): port = kube_client.V1ServicePort(name=f"{name}-{i}", port=i, protocol="TCP") @@ -413,10 +442,30 @@ def get_learning_service(self, object_id, start_port): ) return service - def get_learning_ports(self, start_port): + def get_graphlearn_torch_service(self, object_id, start_port): + service_type = self._service_type + num_workers = self._num_workers + name = self.get_graphlearn_torch_service_name(object_id) + ports = [] + for i in range(start_port, start_port + num_workers): + port = kube_client.V1ServicePort(name=f"{name}-{i}", port=i, protocol="TCP") + ports.append(port) + service_spec = ResourceBuilder.get_service_spec( + service_type, ports, self._engine_labels, "Local" + ) + service = ResourceBuilder.get_service( + self._namespace, name, service_spec, self._engine_labels + ) + return service + + def get_graphlearn_ports(self, start_port): num_workers = self._num_workers return [i for i in range(start_port, start_port + num_workers)] + def get_graphlearn_torch_ports(self, start_port): + num_loaders = 4 + return [i for i in range(start_port, start_port + num_loaders)] + @property def engine_stateful_set_name(self): return f"{self._engine_pod_prefix}{self._instance_id}" @@ -444,11 +493,14 @@ def get_vineyard_service_endpoint(self, api_client): assert len(endpoints) > 0 return endpoints[0] - def get_learning_service_name(self, object_id): - return f"{self._learning_prefix}{object_id}" + def get_graphlearn_service_name(self, object_id): + return f"{self._graphlearn_prefix}{object_id}" + + def get_graphlearn_torch_service_name(self, object_id): + return f"{self._graphlearn_torch_prefix}{object_id}" def get_graphlearn_service_endpoint(self, api_client, object_id, pod_host_ip_list): - service_name = self.get_learning_service_name(object_id) + service_name = self.get_graphlearn_service_name(object_id) service_type = self._service_type core_api = kube_client.CoreV1Api(api_client) if service_type == "NodePort": diff --git a/coordinator/gscoordinator/constants.py b/coordinator/gscoordinator/constants.py index ed0db819a656..fa8624705836 100644 --- a/coordinator/gscoordinator/constants.py +++ b/coordinator/gscoordinator/constants.py @@ -2,7 +2,8 @@ ANALYTICAL_CONTAINER_NAME = "engine" INTERACTIVE_FRONTEND_CONTAINER_NAME = "frontend" INTERACTIVE_EXECUTOR_CONTAINER_NAME = "executor" -LEARNING_CONTAINER_NAME = "learning" +GRAPHLEARN_CONTAINER_NAME = "graphlearn" +GRAPHLEARN_TORCH_CONTAINER_NAME = "graphlearn-torch" DATASET_CONTAINER_NAME = "dataset" MARS_CONTAINER_NAME = "mars" VINEYARD_CONTAINER_NAME = "vineyard" diff --git a/coordinator/gscoordinator/kubernetes_launcher.py b/coordinator/gscoordinator/kubernetes_launcher.py index 762e2a45d56f..aca70b6475a9 100644 --- a/coordinator/gscoordinator/kubernetes_launcher.py +++ b/coordinator/gscoordinator/kubernetes_launcher.py @@ -27,6 +27,8 @@ import sys import time +from graphscope.proto import message_pb2 + from gscoordinator.cluster_builder import EngineCluster from gscoordinator.cluster_builder import MarsCluster @@ -56,8 +58,9 @@ from graphscope.proto import types_pb2 from gscoordinator.constants import ANALYTICAL_CONTAINER_NAME +from gscoordinator.constants import GRAPHLEARN_CONTAINER_NAME +from gscoordinator.constants import GRAPHLEARN_TORCH_CONTAINER_NAME from gscoordinator.constants import INTERACTIVE_EXECUTOR_CONTAINER_NAME -from gscoordinator.constants import LEARNING_CONTAINER_NAME from gscoordinator.launcher import AbstractLauncher from gscoordinator.utils import ANALYTICAL_ENGINE_PATH from gscoordinator.utils import GRAPHSCOPE_HOME @@ -66,6 +69,7 @@ from gscoordinator.utils import ResolveMPICmdPrefix from gscoordinator.utils import delegate_command_to_pod from gscoordinator.utils import parse_as_glog_level +from gscoordinator.utils import replace_string_in_dict from gscoordinator.utils import run_kube_cp_command logger = logging.getLogger("graphscope") @@ -84,6 +88,7 @@ def __init__(self, config: Config): self._api_client = resolve_api_client() self._core_api = kube_client.CoreV1Api(self._api_client) self._apps_api = kube_client.AppsV1Api(self._api_client) + self._pytorchjobs_api = kube_client.CustomObjectsApi(self._api_client) self._resource_object = ResourceManager(self._api_client) self._config: Config = config @@ -192,11 +197,16 @@ def __init__(self, config: Config): self._interactive_pod_name = {} self._interactive_pod_ip = {} self._interactive_pod_host_ip = {} - # learning engine - self._learning_resource_object = {} - self._learning_pod_name = {} - self._learning_pod_ip = {} - self._learning_pod_host_ip = {} + # graphlearn engine + self._graphlearn_resource_object = {} + self._graphlearn_pod_name = {} + self._graphlearn_pod_ip = {} + self._graphlearn_pod_host_ip = {} + # graphlearn_torch engine + self._graphlearn_torch_resource_object = {} + self._graphlearn_torch_pod_name = {} + self._graphlearn_torch_pod_ip = {} + self._graphlearn_torch_pod_host_ip = {} self._analytical_engine_endpoint = None self._mars_service_endpoint = None @@ -209,10 +219,15 @@ def __init__(self, config: Config): # frontend port self._interactive_port = 8233 # 8000 ~ 9000 is exposed - self._learning_start_port = 8000 + self._graphlearn_start_port = 8000 + # 9001 ~ 10001 is exposed + self._graphlearn_torch_start_port = 9001 self._graphlearn_services = {} - self._learning_instance_processes = {} + self._graphlearn_instance_processes = {} + + self._graphlearn_torch_services = {} + self._graphlearn_torch_instance_processes = {} # workspace self._instance_workspace = os.path.join(WORKSPACE, self._instance_id) @@ -242,7 +257,8 @@ def _build_engine_cluster(self): return EngineCluster( config=self._config, engine_pod_prefix=self._engine_pod_prefix, - learning_start_port=self._learning_start_port, + graphlearn_start_port=self._graphlearn_start_port, + graphlearn_torch_start_port=self._graphlearn_torch_start_port, ) def get_coordinator_owner_references(self): @@ -390,7 +406,10 @@ def deploy_engine(self, engine_type, object_id=None): engine_type == "interactive" ) self._config.kubernetes_launcher.engine.enable_gle = ( - engine_type == "learning" + engine_type == "graphlearn" + ) + self._config.kubernetes_launcher.engine.enable_glt = ( + engine_type == "graphlearn-torch" ) self._engine_cluster = self._build_engine_cluster() @@ -487,14 +506,20 @@ def deploy_interactive_engine(self, object_id): return pod_name_list, pod_ip_list, pod_host_ip_list - def deploy_learning_engine(self, object_id): - return self.deploy_engine("learning", object_id) + def deploy_graphlearn_engine(self, object_id): + return self.deploy_engine("graphlearn", object_id) + + def deploy_graphlearn_torch_engine(self, object_id): + return self.deploy_engine("graphlearn-torch", object_id) def delete_interactive_engine(self, object_id): self.delete_engine_stateful_set_with_object_id("interactive", object_id) - def delete_learning_engine(self, object_id): - self.delete_engine_stateful_set_with_object_id("learning", object_id) + def delete_graphlearn_engine(self, object_id): + self.delete_engine_stateful_set_with_object_id("graphlearn", object_id) + + def delete_graphlearn_torch_engine(self, object_id): + self.delete_engine_stateful_set_with_object_id("graphlearn-torch", object_id) def _allocate_interactive_engine(self, object_id): # check the interactive engine flag @@ -848,16 +873,26 @@ def _create_frontend_service(self): response = self._core_api.create_namespaced_service(self._namespace, service) self._resource_object.append(response) - def _create_learning_service(self, object_id): - logger.info("Creating learning service...") - service = self._engine_cluster.get_learning_service( - object_id, self._learning_start_port + def _create_graphlearn_service(self, object_id): + logger.info("Creating graphlearn service...") + service = self._engine_cluster.get_graphlearn_service( + object_id, self._graphlearn_start_port ) service.metadata.owner_references = self._owner_references response = self._core_api.create_namespaced_service(self._namespace, service) self._graphlearn_services[object_id] = response self._resource_object.append(response) + def _create_graphlearn_torch_service(self, object_id): + logger.info("Creating graphlearn torch service...") + service = self._engine_cluster.get_graphlearn_torch_service( + object_id, self._graphlearn_torch_start_port + ) + service.metadata.owner_references = self._owner_references + response = self._core_api.create_namespaced_service(self._namespace, service) + self._graphlearn_torch_services[object_id] = response + self._resource_object.append(response) + def get_engine_config(self): config = { "vineyard_service_name": self._engine_cluster.vineyard_service_name, @@ -1261,17 +1296,27 @@ def stop(self, is_dangling=False): self._serving = False logger.info("Kubernetes launcher stopped") - def _allocate_learning_engine(self, object_id): - # check the learning engine flag + def _allocate_graphlearn_engine(self, object_id): + # check the graphlearn engine flag if not self._config.kubernetes_launcher.engine.enable_gle: - raise NotImplementedError("Learning engine not enabled") + raise NotImplementedError("GraphLearn engine not enabled") + + # allocate graphlearn engine based on the mode + if self._deploy_mode == "eager": + return self._pod_name_list, self._pod_ip_list, self._pod_host_ip_list + return self.deploy_graphlearn_engine(object_id) - # allocate learning engine based on the mode + def _allocate_graphlearn_torch_engine(self, object_id): + # check the graphlearn torch engine flag + if not self._config.kubernetes_launcher.engine.enable_glt: + raise NotImplementedError("GraphLearn torch engine not enabled") + + # allocate graphlearn engine based on the mode if self._deploy_mode == "eager": return self._pod_name_list, self._pod_ip_list, self._pod_host_ip_list - return self.deploy_learning_engine(object_id) + return self.deploy_graphlearn_torch_engine(object_id) - def _distribute_learning_process( + def _distribute_graphlearn_process( self, pod_name_list, pod_host_ip_list, object_id, handle, config ): # allocate service for ports @@ -1286,7 +1331,9 @@ def _distribute_learning_process( f"{pod_name}:{port}" for pod_name, port in zip( pod_name_list, - self._engine_cluster.get_learning_ports(self._learning_start_port), + self._engine_cluster.get_graphlearn_ports( + self._graphlearn_start_port + ), ) ] ) @@ -1296,12 +1343,12 @@ def _distribute_learning_process( ).decode("utf-8", errors="ignore") # launch the server - self._learning_instance_processes[object_id] = [] + self._graphlearn_instance_processes[object_id] = [] for pod_index, pod in enumerate(self._pod_name_list): - container = LEARNING_CONTAINER_NAME + container = GRAPHLEARN_CONTAINER_NAME sub_cmd = f"python3 -m gscoordinator.launch_graphlearn {handle} {config} {pod_index}" cmd = f"kubectl -n {self._namespace} exec -it -c {container} {pod} -- {sub_cmd}" - logger.debug("launching learning server: %s", " ".join(cmd)) + # logger.debug("launching learning server: %s", " ".join(cmd)) proc = subprocess.Popen( shlex.split(cmd), stdout=subprocess.PIPE, @@ -1318,30 +1365,184 @@ def _distribute_learning_process( suppressed=(not logger.isEnabledFor(logging.DEBUG)), ) setattr(proc, "stdout_watcher", stdout_watcher) - self._learning_instance_processes[object_id].append(proc) + self._graphlearn_instance_processes[object_id].append(proc) # Create Service - self._create_learning_service(object_id) + self._create_graphlearn_service(object_id) # update the port usage record - self._learning_start_port += len(pod_name_list) + self._graphlearn_start_port += len(pod_name_list) # parse the service hosts and ports return self._engine_cluster.get_graphlearn_service_endpoint( self._api_client, object_id, pod_host_ip_list ) - def create_learning_instance(self, object_id, handle, config, learning_backend): - pod_name_list, _, pod_host_ip_list = self._allocate_learning_engine(object_id) - if not pod_name_list or not pod_host_ip_list: - raise RuntimeError("Failed to allocate learning engine") - return self._distribute_learning_process( - pod_name_list, pod_host_ip_list, object_id, handle, config + def _distribute_graphlearn_torch_process( + self, pod_name_list, pod_ip_list, object_id, handle, config + ): + # allocate service for ports + # prepare arguments + handle = json.loads( + base64.b64decode(handle.encode("utf-8", errors="ignore")).decode( + "utf-8", errors="ignore" + ) + ) + + ports = self._engine_cluster.get_graphlearn_torch_ports( + self._graphlearn_torch_start_port + ) + handle["master_addr"] = pod_ip_list[0] + handle["server_client_master_port"] = ports[0] + server_list = [f"{pod_ip_list[0]}:{ports[i]}" for i in range(4)] + + server_handle = base64.b64encode( + json.dumps(handle).encode("utf-8", errors="ignore") + ).decode("utf-8", errors="ignore") + + # launch the server + self._graphlearn_torch_instance_processes[object_id] = [] + for pod_index, pod in enumerate(self._pod_name_list): + container = GRAPHLEARN_TORCH_CONTAINER_NAME + sub_cmd = f"env PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python \ + python3 -m gscoordinator.launch_graphlearn_torch \ + {server_handle} {config} {pod_index}" + cmd = f"kubectl -n {self._namespace} exec -it -c {container} {pod} -- {sub_cmd}" + # logger.debug("launching learning server: %s", " ".join(cmd)) + proc = subprocess.Popen( + shlex.split(cmd), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + encoding="utf-8", + errors="replace", + universal_newlines=True, + bufsize=1, + ) + stdout_watcher = PipeWatcher( + proc.stdout, + sys.stdout, + suppressed=(not logger.isEnabledFor(logging.DEBUG)), + ) + + time.sleep(5) + logger.debug("process status: %s", proc.poll()) + + setattr(proc, "stdout_watcher", stdout_watcher) + self._graphlearn_torch_instance_processes[object_id].append(proc) + + # Create Service + self._create_graphlearn_torch_service(object_id) + # update the port usage record + self._graphlearn_torch_start_port += len(pod_name_list) + + # prepare config map for client scripts + config_map = kube_client.V1ConfigMap( + api_version="v1", + kind="ConfigMap", + metadata=kube_client.V1ObjectMeta( + name="graphlearn-torch-client-config", + namespace=self._namespace, + ), + data=handle["client_content"], + ) + self._core_api.create_namespaced_config_map(self._namespace, config_map) + + # prepare the manifest + pytorch_job_manifest = replace_string_in_dict( + handle["manifest"], "${MASTER_ADDR}", handle["master_addr"] ) + # parse the pytorchjob yaml + group = pytorch_job_manifest["apiVersion"].split("/")[0] + version = pytorch_job_manifest["apiVersion"].split("/")[1] + name = pytorch_job_manifest["metadata"]["name"] + namespace = pytorch_job_manifest["metadata"]["namespace"] + plural = "pytorchjobs" # This is PyTorchJob CRD's plural name + + try: + # create PyTorchJob + api_response = self._pytorchjobs_api.create_namespaced_custom_object( + group=group, + version=version, + namespace=namespace, + plural=plural, + body=pytorch_job_manifest, + ) + logger.info(api_response) + except K8SApiException as e: + logger.info( + f"Exception when calling CustomObjectsApi->create_namespaced_custom_object: {e}" + ) + raise + + # set Watcher to monitor the state of the PyTorchJob + w = kube_watch.Watch() + + # loop checking the state of PyTorchJob + for event in w.stream( + self._pytorchjobs_api.list_namespaced_custom_object, + group, + version, + namespace, + plural, + ): + pytorch_job = event["object"] + if pytorch_job.get("metadata", {}).get("name") == name: + status = pytorch_job.get("status", {}) + if status: # check status existence + conditions = status.get("conditions", []) + for condition in conditions: + if ( + condition.get("type") == "Succeeded" + and condition.get("status") == "True" + ): + logger.info(f"PyTorchJob {name} has succeeded!") + w.stop() + break + elif ( + condition.get("type") == "Failed" + and condition.get("status") == "True" + ): + logger.info(f"PyTorchJob {name} has failed!") + w.stop() + break + + self.close_graphlearn_torch_client(group, name, version, plural, namespace) + + return server_list + + def create_learning_instance(self, object_id, handle, config, learning_backend): + if learning_backend == message_pb2.LearningBackend.GRAPHLEARN: + pod_name_list, _, pod_host_ip_list = self._allocate_graphlearn_engine( + object_id + ) + if not pod_name_list or not pod_host_ip_list: + raise RuntimeError("Failed to allocate learning engine") + return self._distribute_graphlearn_process( + pod_name_list, pod_host_ip_list, object_id, handle, config + ) + elif learning_backend == message_pb2.LearningBackend.GRAPHLEARN_TORCH: + pod_name_list, pod_ip_list, pod_host_ip_list = ( + self._allocate_graphlearn_torch_engine(object_id) + ) + if not pod_name_list or not pod_host_ip_list: + raise RuntimeError("Failed to allocate learning engine") + return self._distribute_graphlearn_torch_process( + pod_name_list, pod_ip_list, object_id, handle, config + ) + else: + raise ValueError("invalid learning backend") - def close_learning_instance(self, object_id): + def close_learning_instance(self, object_id, learning_backend): + if learning_backend == message_pb2.LearningBackend.GRAPHLEARN: + self.close_graphlearn_instance(object_id) + elif learning_backend == message_pb2.LearningBackend.GRAPHLEARN_TORCH: + self.close_graphlearn_torch_instance(object_id) + else: + raise ValueError("invalid learning backend") + + def close_graphlearn_instance(self, object_id): if self._deploy_mode == "lazy": - self.delete_learning_engine(object_id) + self.delete_graphlearn_engine(object_id) return - if object_id not in self._learning_instance_processes: + if object_id not in self._graphlearn_instance_processes: return # delete the services target = self._graphlearn_services[object_id] @@ -1356,13 +1557,75 @@ def close_learning_instance(self, object_id): logger.exception("Failed to delete graphlearn service for %s", object_id) # terminate the process - for proc in self._learning_instance_processes[object_id]: + for proc in self._graphlearn_instance_processes[object_id]: try: proc.terminate() proc.wait(1) except Exception: # pylint: disable=broad-except logger.exception("Failed to terminate graphlearn server") - self._learning_instance_processes[object_id].clear() + self._graphlearn_instance_processes[object_id].clear() + + def close_graphlearn_torch_instance(self, object_id): + if self._deploy_mode == "lazy": + self.delete_graphlearn_torch_engine(object_id) + return + if object_id not in self._graphlearn_torch_instance_processes: + return + # delete the services + target = self._graphlearn_torch_services[object_id] + try: + delete_kubernetes_object( + api_client=self._api_client, + target=target, + wait=self._waiting_for_delete, + timeout_seconds=self._timeout_seconds, + ) + except Exception: # pylint: disable=broad-except + logger.exception( + "Failed to delete graphlearn torch service for %s", object_id + ) + + # terminate the process + for proc in self._graphlearn_torch_instance_processes[object_id]: + try: + proc.terminate() + proc.wait(1) + except Exception: # pylint: disable=broad-except + logger.exception("Failed to terminate graphlearn torch server") + self._graphlearn_torch_instance_processes[object_id].clear() + + def close_graphlearn_torch_client(self, group, name, version, plural, namespace): + # clear PyTorchJob + logger.info(f"Deleting PyTorchJob {name}...") + try: + response = self._pytorchjobs_api.delete_namespaced_custom_object( + group=group, + name=name, + version=version, + plural=plural, + namespace=namespace, + body=kube_client.V1DeleteOptions( + propagation_policy="Foreground", + ), + ) + logger.info(f"PyTorchJob {name} deleted. Response: {response}") + except K8SApiException as e: + logger.info( + f"Exception when calling CustomObjectsApi->delete_namespaced_custom_object: {e}" + ) + + try: + response = self._core_api.delete_namespaced_config_map( + name="graphlearn-torch-client-config", + namespace=self._namespace, + ) + logger.info( + f"ConfigMap graphlearn-torch-client-config deleted. Response: {response}" + ) + except K8SApiException as e: + logger.info( + f"Exception when calling CoreV1Api->delete_namespaced_config_map: {e}" + ) class ResourceManager(object): diff --git a/coordinator/gscoordinator/launch_graphlearn_torch.py b/coordinator/gscoordinator/launch_graphlearn_torch.py index 4650161aaa08..06b4dfa08364 100644 --- a/coordinator/gscoordinator/launch_graphlearn_torch.py +++ b/coordinator/gscoordinator/launch_graphlearn_torch.py @@ -19,13 +19,16 @@ import base64 import json import logging -import os.path as osp import sys +import graphscope import graphscope.learning.graphlearn_torch as glt import torch from graphscope.learning.gl_torch_graph import GLTorchGraph +graphscope.set_option(show_log=True) +graphscope.set_option(log_level="DEBUG") + logger = logging.getLogger("graphscope") @@ -39,6 +42,33 @@ def decode_arg(arg): ) +def extract_node_type_names(edges): + node_type_names = set() + for edge in edges: + node_type_names.update([edge[0], edge[-1]]) + return node_type_names + + +def init_node_pb(handle, server_rank, node_type_names): + node_pb = ( + glt.data.VineyardPartitionBook( + str(handle["vineyard_socket"]), + str(handle["fragments"][server_rank]), + list(node_type_names)[0], + ) + if len(node_type_names) == 1 + else { + node_type_name: glt.data.VineyardPartitionBook( + str(handle["vineyard_socket"]), + str(handle["fragments"][server_rank]), + node_type_name, + ) + for node_type_name in node_type_names + } + ) + return node_pb + + def run_server_proc(proc_rank, handle, config, server_rank, dataset): glt.distributed.init_server( num_servers=handle["num_servers"], @@ -49,36 +79,42 @@ def run_server_proc(proc_rank, handle, config, server_rank, dataset): num_rpc_threads=16, is_dynamic=True, ) - logger.info(f"-- [Server {server_rank}] Waiting for exit ...") glt.distributed.wait_and_shutdown_server() - logger.info(f"-- [Server {server_rank}] Exited ...") def launch_graphlearn_torch_server(handle, config, server_rank): logger.info(f"-- [Server {server_rank}] Initializing server ...") - edge_dir = config.pop("edge_dir") random_node_split = config.pop("random_node_split") - dataset = glt.distributed.DistDataset(edge_dir=edge_dir) + edges = config.pop("edges") + node_type_names = extract_node_type_names(edges) + + dataset = glt.distributed.DistDataset( + edge_dir=edge_dir, + num_partitions=handle["num_servers"], + partition_idx=server_rank, + node_pb=init_node_pb(handle, server_rank, node_type_names), + ) dataset.load_vineyard( - vineyard_id=str(handle["vineyard_id"]), + vineyard_id=str(handle["fragments"][server_rank]), vineyard_socket=handle["vineyard_socket"], + edges=edges, **config, ) if random_node_split is not None: dataset.random_node_split(**random_node_split) - logger.info(f"-- [Server {server_rank}] Initializing server ...") + logger.info(f"-- [Server {server_rank}] Running server ...") torch.multiprocessing.spawn( fn=run_server_proc, args=(handle, config, server_rank, dataset), nprocs=1 ) + logger.info(f"-- [Server {server_rank}] Server exited.") if __name__ == "__main__": if len(sys.argv) < 3: logger.info( "Usage: ./launch_graphlearn_torch.py ", - file=sys.stderr, ) sys.exit(-1) @@ -87,7 +123,4 @@ def launch_graphlearn_torch_server(handle, config, server_rank): server_index = int(sys.argv[3]) config = GLTorchGraph.reverse_transform_config(config) - logger.info( - f"launch_graphlearn_torch_server handle: {handle} config: {config} server_index: {server_index}" - ) launch_graphlearn_torch_server(handle, config, server_index) diff --git a/coordinator/gscoordinator/launcher.py b/coordinator/gscoordinator/launcher.py index 308695c23672..0c51364a9d3b 100644 --- a/coordinator/gscoordinator/launcher.py +++ b/coordinator/gscoordinator/launcher.py @@ -109,7 +109,7 @@ def close_interactive_instance(self, object_id: int): pass @abstractmethod - def close_learning_instance(self, object_id: int): + def close_learning_instance(self, object_id: int, learning_backend: int): pass @abstractmethod diff --git a/coordinator/gscoordinator/local_launcher.py b/coordinator/gscoordinator/local_launcher.py index 10faaa41c098..acaf5c00baab 100644 --- a/coordinator/gscoordinator/local_launcher.py +++ b/coordinator/gscoordinator/local_launcher.py @@ -317,7 +317,7 @@ def _create_graphlearn_torch_instance(self, object_id, handle, config): server_client_master_port = get_free_port("localhost") handle["server_client_master_port"] = server_client_master_port - + handle["master_addr"] = "localhost" server_list = [f"localhost:{server_client_master_port}"] # for train, val and test for _ in range(3): @@ -347,7 +347,7 @@ def _create_graphlearn_torch_instance(self, object_id, handle, config): config, str(index), ] - logger.debug("launching graphlearn_torch server: %s", " ".join(str(cmd))) + # logger.debug("launching graphlearn_torch server: %s", " ".join(cmd)) proc = subprocess.Popen( cmd, @@ -359,11 +359,16 @@ def _create_graphlearn_torch_instance(self, object_id, handle, config): universal_newlines=True, bufsize=1, ) + logger.debug("suppressed: %s", (not logger.isEnabledFor(logging.DEBUG))) stdout_watcher = PipeWatcher( proc.stdout, sys.stdout, suppressed=(not logger.isEnabledFor(logging.DEBUG)), ) + + time.sleep(5) + logger.debug("process status: %s", proc.poll()) + setattr(proc, "stdout_watcher", stdout_watcher) self._learning_instance_processes[object_id].append(proc) return server_list @@ -387,7 +392,7 @@ def close_interactive_instance(self, object_id): process.wait(timeout=self._timeout_seconds) return process - def close_learning_instance(self, object_id): + def close_learning_instance(self, object_id, learning_backend=0): if object_id not in self._learning_instance_processes: return diff --git a/coordinator/gscoordinator/object_manager.py b/coordinator/gscoordinator/object_manager.py index cd68fb0daeff..a8d27463bcf5 100644 --- a/coordinator/gscoordinator/object_manager.py +++ b/coordinator/gscoordinator/object_manager.py @@ -61,8 +61,11 @@ def submit(self, message, bindings=None, request_options=None): class LearningInstanceManager(object): - def __init__(self, object_id): - self.type = "gle_manager" + def __init__(self, object_id, learning_backend): + if learning_backend == 0: + self.type = "gle_manager" + else: + self.type = "glt_manager" self.object_id = object_id diff --git a/coordinator/gscoordinator/operator_launcher.py b/coordinator/gscoordinator/operator_launcher.py index 35de83fdbbac..aa4793a046f6 100644 --- a/coordinator/gscoordinator/operator_launcher.py +++ b/coordinator/gscoordinator/operator_launcher.py @@ -95,7 +95,7 @@ def close_analytical_instance(self): def close_interactive_instance(self, object_id): pass - def close_learning_instance(self, object_id): + def close_learning_instance(self, object_id, learning_backend): pass def launch_etcd(self): diff --git a/coordinator/gscoordinator/servicer/graphscope_one/service.py b/coordinator/gscoordinator/servicer/graphscope_one/service.py index a03f17b42d1e..904f27d13c54 100644 --- a/coordinator/gscoordinator/servicer/graphscope_one/service.py +++ b/coordinator/gscoordinator/servicer/graphscope_one/service.py @@ -457,7 +457,6 @@ def _match_frontend_endpoint(pattern, lines): def CreateLearningInstance(self, request, context): object_id = request.object_id - logger.info("Create learning instance with object id %ld", object_id) handle, config, learning_backend = ( request.handle, request.config, @@ -467,13 +466,15 @@ def CreateLearningInstance(self, request, context): endpoints = self._launcher.create_learning_instance( object_id, handle, config, learning_backend ) - self._object_manager.put(object_id, LearningInstanceManager(object_id)) + self._object_manager.put( + object_id, LearningInstanceManager(object_id, learning_backend) + ) except Exception as e: context.set_code(grpc.StatusCode.ABORTED) context.set_details( f"Create learning instance failed: ${e}. The traceback is: {traceback.format_exc()}" ) - self._launcher.close_learning_instance(object_id) + self._launcher.close_learning_instance(object_id, learning_backend) self._object_manager.pop(object_id) return message_pb2.CreateLearningInstanceResponse() return message_pb2.CreateLearningInstanceResponse( @@ -504,7 +505,9 @@ def CloseLearningInstance(self, request, context): self._object_manager.pop(object_id) logger.info("Close learning instance with object id %ld", object_id) try: - self._launcher.close_learning_instance(object_id) + self._launcher.close_learning_instance( + object_id, request.learning_backend + ) except Exception as e: context.set_code(grpc.StatusCode.ABORTED) context.set_details( @@ -534,7 +537,9 @@ def cleanup(self, cleanup_instance=True, is_dangling=False): elif obj.type == "gie_manager": self._launcher.close_interactive_instance(obj.object_id) elif obj.type == "gle_manager": - self._launcher.close_learning_instance(obj.object_id) + self._launcher.close_learning_instance(obj.object_id, 0) + elif obj.type == "glt_manager": + self._launcher.close_learning_instance(obj.object_id, 1) if op_type is not None: dag_def = create_single_op_dag(op_type, config) diff --git a/coordinator/gscoordinator/utils.py b/coordinator/gscoordinator/utils.py index bc20f008c719..78bbf68833dc 100644 --- a/coordinator/gscoordinator/utils.py +++ b/coordinator/gscoordinator/utils.py @@ -2113,3 +2113,15 @@ def _check_cypher_task(endpoint): raise TimeoutError( f"{server.capitalize()} check query failed: {error_message}" ) + + +def replace_string_in_dict(dict_obj, old, new): + if isinstance(dict_obj, dict): + for key, value in dict_obj.items(): + dict_obj[key] = replace_string_in_dict(value, old, new) + elif isinstance(dict_obj, list): + for index, item in enumerate(dict_obj): + dict_obj[index] = replace_string_in_dict(item, old, new) + elif isinstance(dict_obj, str): + return dict_obj.replace(old, new) + return dict_obj diff --git a/docs/learning_engine/guide_and_examples.md b/docs/learning_engine/guide_and_examples.md index fa97be8bbf53..f06f787e41c3 100644 --- a/docs/learning_engine/guide_and_examples.md +++ b/docs/learning_engine/guide_and_examples.md @@ -67,4 +67,16 @@ ho2 to train a PyG model using GraphScope on your local machine. ``` ^^^^^^^^^^^^^^ Training a Node Classification Model(PyG) on Your Local Machine +```` + +````{panels} +:header: text-center +:column: col-lg-12 p-2 + +```{link-button} tutorial_node_classification_pyg_k8s.html +:text: Tutorial +:classes: btn-block stretched-link +``` +^^^^^^^^^^^^^^ +Tutorial: Training a Node Classification Model (PyG) on a K8S Cluster ```` \ No newline at end of file diff --git a/docs/learning_engine/tutorial_node_classification_pyg_k8s.md b/docs/learning_engine/tutorial_node_classification_pyg_k8s.md new file mode 100644 index 000000000000..844d1ea3e151 --- /dev/null +++ b/docs/learning_engine/tutorial_node_classification_pyg_k8s.md @@ -0,0 +1,425 @@ +# Tutorial: Training a Node Classification Model (PyG) on a K8S Cluster + +This tutorial presents a server-client example that illustrates how GraphScope trains the GraphSAGE model (implemented in PyG) for a node classification task on a Kubernetes cluster. + +## Set parameters & load graph + +```python +import graphscope as gs +from graphscope.dataset import load_ogbn_arxiv + +gs.set_option(log_level="DEBUG") +gs.set_option(show_log=True) + +params = { + "NUM_SERVER_NODES": 2, + "NUM_CLIENT_NODES": 2, +} + +# load the ogbn_arxiv graph as an example. +sess = gs.session( + with_dataset=True, + k8s_service_type="NodePort", + k8s_vineyard_mem="8Gi", + k8s_engine_mem="8Gi", + vineyard_shared_mem="8Gi", + k8s_image_pull_policy="IfNotPresent", + k8s_image_tag="0.26.0a20240115-x86_64", + num_workers=params["NUM_SERVER_NODES"], +) +g = load_ogbn_arxiv(sess=sess, prefix="/dataset/ogbn_arxiv") +``` + +## Launch the Server Engine +```python +glt_graph = gs.graphlearn_torch( + g, + edges=[ + ("paper", "citation", "paper"), + ], + node_features={ + "paper": [f"feat_{i}" for i in range(128)], + }, + node_labels={ + "paper": "label", + }, + edge_dir="out", + random_node_split={ + "num_val": 0.1, + "num_test": 0.1, + }, + num_clients=params["NUM_CLIENT_NODES"], + # Specify the client yaml with the client pods' configuration. + manifest_path="./client.yaml", + # Specify the client folder path that contains the client scripts. + client_folder_path="./", +) + +print("Exiting...") +``` + +## Configure the parameters for client pods +```yaml +apiVersion: "kubeflow.org/v1" +kind: PyTorchJob +metadata: + name: graphlearn-torch-client + namespace: default +spec: + pytorchReplicaSpecs: + Master: + replicas: 1 + restartPolicy: OnFailure + template: + spec: + containers: + - name: pytorch + image: registry.cn-hongkong.aliyuncs.com/graphscope/graphlearn-torch-client:0.26.0a20240115-x86_64 + imagePullPolicy: IfNotPresent + command: + - bash + - -c + - |- + python3 /workspace/client.py --node_rank 0 --master_addr ${MASTER_ADDR} --num_server_nodes ${NUM_SERVER_NODES} --num_client_nodes ${NUM_CLIENT_NODES} + volumeMounts: + - mountPath: /dev/shm + name: cache-volume + - mountPath: /workspace + name: client-volume + volumes: + - name: cache-volume + emptyDir: + medium: Memory + sizeLimit: "8G" + - name: client-volume + configMap: + name: graphlearn-torch-client-config + Worker: + replicas: ${NUM_WORKER_REPLICAS} + restartPolicy: OnFailure + template: + spec: + containers: + - name: pytorch + image: registry.cn-hongkong.aliyuncs.com/graphscope/graphlearn-torch-client:0.26.0a20240115-x86_64 + imagePullPolicy: IfNotPresent + command: + - bash + - -c + - |- + python3 /workspace/client.py --node_rank $((${MY_POD_NAME: -1}+1)) --master_addr ${MASTER_ADDR} --group_master ${GROUP_MASTER} --num_server_nodes ${NUM_SERVER_NODES} --num_client_nodes ${NUM_CLIENT_NODES} + env: + - name: GROUP_MASTER + value: graphlearn-torch-client-master-0 + - name: MY_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + volumeMounts: + - mountPath: /dev/shm + name: cache-volume + - mountPath: /workspace + name: client-volume + volumes: + - name: cache-volume + emptyDir: + medium: Memory + sizeLimit: "8G" + - name: client-volume + configMap: + name: graphlearn-torch-client-config +``` + +## Write training and testing script + +### Import packages +```python + +import argparse +import time +from typing import List + +import torch +import torch.nn.functional as F +from torch.distributed.algorithms.join import Join +from torch.nn.parallel import DistributedDataParallel +from torch_geometric.nn import GraphSAGE + +import graphscope as gs +import graphscope.learning.graphlearn_torch as glt +from graphscope.learning.gl_torch_graph import GLTorchGraph +from graphscope.learning.graphlearn_torch.typing import Split + +gs.set_option(log_level="DEBUG") +gs.set_option(show_log=True) +``` + +### Define test function +```python +@torch.no_grad() +def test(model, test_loader, dataset_name): + model.eval() + xs = [] + y_true = [] + for i, batch in enumerate(test_loader): + if i == 0: + device = batch.x.device + batch.x = batch.x.to(torch.float32) # TODO + x = model.module(batch.x, batch.edge_index)[: batch.batch_size] + xs.append(x.cpu()) + y_true.append(batch.y[: batch.batch_size].cpu()) + del batch + + xs = [t.to(device) for t in xs] + y_true = [t.to(device) for t in y_true] + y_pred = torch.cat(xs, dim=0).argmax(dim=-1, keepdim=True) + y_true = torch.cat(y_true, dim=0) + test_acc = sum((y_pred.T == y_true.T)[0]) / len(y_true.T) + + return test_acc.item() +``` +### Define the loader and training process +```python +def run_client_proc( + glt_graph, + group_master: str, + num_servers: int, + num_clients: int, + client_rank: int, + server_rank_list: List[int], + dataset_name: str, + epochs: int, + batch_size: int, + training_pg_master_port: int, +): + + print("-- Initializing client ...") + glt.distributed.init_client( + num_servers=num_servers, + num_clients=num_clients, + client_rank=client_rank, + master_addr=glt_graph.master_addr, + master_port=glt_graph.server_client_master_port, + num_rpc_threads=4, + client_group_name="k8s_glt_client", + is_dynamic=True, + ) + + # Initialize training process group of PyTorch. + current_ctx = glt.distributed.get_context() + + torch.distributed.init_process_group( + backend="gloo", + rank=current_ctx.rank, + world_size=current_ctx.world_size, + init_method="tcp://{}:{}".format(group_master, training_pg_master_port), + ) + + device = torch.device("cpu") + # Create distributed neighbor loader on remote server for training. + print("-- Creating training dataloader ...") + train_loader = glt.distributed.DistNeighborLoader( + data=None, + num_neighbors=[5, 3, 2], + input_nodes=Split.train, + batch_size=batch_size, + shuffle=True, + collect_features=True, + to_device=device, + worker_options=glt.distributed.RemoteDistSamplingWorkerOptions( + server_rank=server_rank_list, + num_workers=1, + worker_devices=[torch.device("cpu")], + worker_concurrency=1, + buffer_size="256MB", + prefetch_size=1, + glt_graph=glt_graph, + workload_type="train", + ), + ) + + # Create distributed neighbor loader on remote server for testing. + print("-- Creating testing dataloader ...") + test_loader = glt.distributed.DistNeighborLoader( + data=None, + num_neighbors=[5, 3, 2], + input_nodes=Split.test, + batch_size=batch_size, + shuffle=False, + collect_features=True, + to_device=device, + worker_options=glt.distributed.RemoteDistSamplingWorkerOptions( + server_rank=server_rank_list, + num_workers=1, + worker_devices=[torch.device("cpu")], + worker_concurrency=1, + buffer_size="256MB", + prefetch_size=1, + glt_graph=glt_graph, + workload_type="test", + ), + ) + + # Define model and optimizer. + print("-- Initializing model and optimizer ...") + model = GraphSAGE( + in_channels=128, + hidden_channels=128, + num_layers=3, + out_channels=47, + ).to(device) + model = DistributedDataParallel(model, device_ids=None) + optimizer = torch.optim.Adam(model.parameters(), lr=0.001) + + # Train and test. + print("-- Start training and testing ...") + epochs = 10 + dataset_name = "ogbn-arxiv" + for epoch in range(0, epochs): + model.train() + start = time.time() + with Join([model]): + for batch in train_loader: + optimizer.zero_grad() + batch.x = batch.x.to(torch.float32) # TODO + out = model(batch.x, batch.edge_index)[: batch.batch_size].log_softmax( + dim=-1 + ) + loss = F.nll_loss(out, torch.flatten(batch.y[: batch.batch_size])) + loss.backward() + optimizer.step() + + end = time.time() + print(f"-- Epoch: {epoch:03d}, Loss: {loss:04f} Epoch Time: {end - start}") + torch.distributed.barrier() + # Test accuracy. + if epoch == 0 or epoch > (epochs // 2): + test_acc = test(model, test_loader, dataset_name) + print(f"-- Test Accuracy: {test_acc:.4f}") + torch.distributed.barrier() + + print("-- Shutdowning ...") + glt.distributed.shutdown_client() + + print("-- Exited ...") +``` +### main function +```python +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Arguments for distributed training of supervised SAGE with servers." + ) + parser.add_argument( + "--dataset", + type=str, + default="ogbn-arxiv", + help="The name of ogbn arxiv.", + ) + parser.add_argument( + "--num_server_nodes", + type=int, + default=2, + help="Number of server nodes for remote sampling.", + ) + parser.add_argument( + "--num_client_nodes", + type=int, + default=1, + help="Number of client nodes for training.", + ) + parser.add_argument( + "--node_rank", + type=int, + default=0, + help="The node rank of the current role.", + ) + parser.add_argument( + "--epochs", + type=int, + default=10, + help="The number of training epochs. (client option)", + ) + parser.add_argument( + "--batch_size", + type=int, + default=256, + help="Batch size for the training and testing dataloader.", + ) + parser.add_argument( + "--training_pg_master_port", + type=int, + default=9997, + help="The port used for PyTorch's process group initialization across all training processes.", + ) + parser.add_argument( + "--train_loader_master_port", + type=int, + default=9998, + help="The port used for RPC initialization across all sampling workers of training loader.", + ) + parser.add_argument( + "--test_loader_master_port", + type=int, + default=9999, + help="The port used for RPC initialization across all sampling workers of testing loader.", + ) + parser.add_argument( + "--master_addr", + type=str, + default="localhost", + help="The master address of the graphlearn server.", + ) + parser.add_argument( + "--group_master", + type=str, + default="localhost", + help="The master address of the training process group.", + ) + args = parser.parse_args() + + print( + f"--- Distributed training example of supervised SAGE with server-client mode. Client {args.node_rank} ---" + ) + print(f"* dataset: {args.dataset}") + print(f"* total server nodes: {args.num_server_nodes}") + print(f"* total client nodes: {args.num_client_nodes}") + print(f"* node rank: {args.node_rank}") + + num_servers = args.num_server_nodes + num_clients = args.num_client_nodes + + print(f"* epochs: {args.epochs}") + print(f"* batch size: {args.batch_size}") + print(f"* training process group master port: {args.training_pg_master_port}") + print(f"* training loader master port: {args.train_loader_master_port}") + print(f"* testing loader master port: {args.test_loader_master_port}") + + client_rank = args.node_rank + print("--- Loading graph info ...") + glt_graph = GLTorchGraph( + [ + args.master_addr + ":9001", + args.master_addr + ":9002", + args.master_addr + ":9003", + args.master_addr + ":9004", + ] + ) + print("--- Launching client processes ...") + run_client_proc( + glt_graph, + args.group_master, + num_servers, + num_clients, + client_rank, + [server_rank for server_rank in range(num_servers)], + args.dataset, + args.epochs, + args.batch_size, + args.training_pg_master_port, + ) +``` + +## Run the script +```shell +python3 k8s_launch.py +``` \ No newline at end of file diff --git a/learning_engine/graphlearn-for-pytorch b/learning_engine/graphlearn-for-pytorch index 16996cbbf06f..a98382c1540e 160000 --- a/learning_engine/graphlearn-for-pytorch +++ b/learning_engine/graphlearn-for-pytorch @@ -1 +1 @@ -Subproject commit 16996cbbf06fca2eceb097dc529759bd6a5374e6 +Subproject commit a98382c1540e1a5d2c9999995f9652b290505a5f diff --git a/python/graphscope/client/session.py b/python/graphscope/client/session.py index ecdfbdbe4058..2abdb2d380cc 100755 --- a/python/graphscope/client/session.py +++ b/python/graphscope/client/session.py @@ -1335,17 +1335,30 @@ def graphlearn_torch( node_labels=None, edge_dir="out", random_node_split=None, + num_clients=1, + manifest_path=None, + client_folder_path="./", ): from graphscope.learning.gl_torch_graph import GLTorchGraph + from graphscope.learning.utils import fill_params_in_yaml + from graphscope.learning.utils import read_folder_files_content handle = { "vineyard_socket": self._engine_config["vineyard_socket"], "vineyard_id": graph.vineyard_id, "fragments": graph.fragments, - "master_addr": "localhost", - "num_servers": 1, - "num_clients": 1, + "num_servers": len(graph.fragments), + "num_clients": num_clients, } + manifest_params = { + "NUM_CLIENT_NODES": handle["num_clients"], + "NUM_SERVER_NODES": handle["num_servers"], + "NUM_WORKER_REPLICAS": handle["num_clients"] - 1, + } + if manifest_path is not None: + handle["manifest"] = fill_params_in_yaml(manifest_path, manifest_params) + if client_folder_path is not None: + handle["client_content"] = read_folder_files_content(client_folder_path) handle = base64.b64encode( json.dumps(handle).encode("utf-8", errors="ignore") @@ -1673,6 +1686,9 @@ def graphlearn_torch( node_labels=None, edge_dir="out", random_node_split=None, + num_clients=1, + manifest_path=None, + client_folder_path="./", ): assert graph is not None, "graph cannot be None" assert ( @@ -1687,4 +1703,7 @@ def graphlearn_torch( node_labels, edge_dir, random_node_split, + num_clients, + manifest_path, + client_folder_path, ) # pylint: disable=protected-access diff --git a/python/graphscope/config.py b/python/graphscope/config.py index 54bcd3dc9a93..8aabe53d14b6 100644 --- a/python/graphscope/config.py +++ b/python/graphscope/config.py @@ -129,7 +129,8 @@ class EngineConfig: # Enable or disable analytical engine with java support. enable_gae_java: bool = False enable_gie: bool = False # Enable or disable interactive engine. - enable_gle: bool = False # Enable or disable learning engine. + enable_gle: bool = False # Enable or disable graphlearn engine. + enable_glt: bool = False # Enable or disable graphlearn_torch engine. preemptive: bool = True @@ -153,9 +154,14 @@ class EngineConfig: default_factory=lambda: ResourceConfig.make_burstable(0.2, "1Gi") ) + # Resource for learning pod + glt_resource: ResourceConfig = field( + default_factory=lambda: ResourceConfig.make_burstable(0.2, "1Gi") + ) + def post_setup(self): valid_engines = set( - "analytical,analytical-java,interactive,learning,gae,gae-java,gie,gle".split( + "analytical,analytical-java,interactive,learning,gae,gae-java,gie,gle,glt".split( "," ) ) @@ -166,14 +172,17 @@ def post_setup(self): self.enable_gae = True if item == "interactive" or item == "gie": self.enable_gie = True - if item == "learning" or item == "gle": + if item == "graphlearn" or item == "gle": self.enable_gle = True + if item == "graphlearn-torch" or item == "glt": + self.enable_glt = True if item == "analytical-java" or item == "gae-java": self.enable_gae_java = True if self.preemptive: self.gae_resource.requests = None self.gle_resource.requests = None + self.glt_resource.requests = None self.gie_executor_resource.requests = None self.gie_frontend_resource.requests = None diff --git a/python/graphscope/deploy/kubernetes/cluster.py b/python/graphscope/deploy/kubernetes/cluster.py index 376c8dab85d3..0fb4484fa19f 100644 --- a/python/graphscope/deploy/kubernetes/cluster.py +++ b/python/graphscope/deploy/kubernetes/cluster.py @@ -183,8 +183,8 @@ def _create_role_and_binding(self): role = ResourceBuilder.get_role( name=self._role_name, namespace=self._namespace, - api_groups=",apps,extensions", # The leading comma is necessary, represents for core api group. - resources="configmaps,deployments,deployments/status,statefulsets,statefulsets/status,endpoints,events,pods,pods/log,pods/exec,pods/status,services,replicasets", # noqa: E501 + api_groups=",apps,extensions,kubeflow.org", # The leading comma is necessary, represents for core api group. + resources="configmaps,deployments,deployments/status,statefulsets,statefulsets/status,endpoints,events,pods,pods/log,pods/exec,pods/status,services,replicasets,pytorchjobs", # noqa: E501 verbs="create,delete,get,update,watch,list", labels=self._labels, ) diff --git a/python/graphscope/learning/gl_torch_examples/README.md b/python/graphscope/learning/gl_torch_examples/README.md new file mode 100644 index 000000000000..88bfa76632c2 --- /dev/null +++ b/python/graphscope/learning/gl_torch_examples/README.md @@ -0,0 +1,23 @@ +# The Local GLTorch Example + +This example demonstrates how to run a local server-client GLTorch job + +### Prerequisites +Install ogb and PyG according to the PyTorch version by running the following command : +```shell +pip3 install ogb +pip3 install torch_geometric +pip3 install pyg_lib torch_scatter torch_sparse torch_cluster torch_spline_conv -f https://data.pyg.org/whl/torch-x.x.x+cpu.html +``` + +### Train and evaluate +1. For local single machine mode. +```shell +python3 local.py +``` +2. For local multi-server/multi-client mode with DDP. +Here num_server_nodes indicates the number of server nodes, and num_client_nodes indicates the number of client nodes. +The server nodes are responsible for the sampling, and the client nodes are responsible for the computation of the model. +```shell +python3 local_sc_ddp.py --num_server_nodes 2 --num_client_nodes 2 +``` diff --git a/python/graphscope/learning/gl_torch_examples/k8s/README.md b/python/graphscope/learning/gl_torch_examples/k8s/README.md new file mode 100644 index 000000000000..65777a3b638f --- /dev/null +++ b/python/graphscope/learning/gl_torch_examples/k8s/README.md @@ -0,0 +1,23 @@ +# How to run on kubernetes + +### 1. Prepare data and code + +We use [Kubeflow](https://github.com/kubeflow/training-operator) to deploy **GLT** jobs on K8s clusters. Make sure Kubeflow is installed before running the following examples. + +### 2. Launch a GraphScope cluster on k8s and start training. +`client.yaml` is an example of launching GLTorch jobs with a default setting, 2 servers and 2 workers, using pytorch-operator. +`client.py` is the GLT training script example, and `k8s_launch.py` is the script to launch the GraphScope k8s cluster and the client jobs. + +First, please specify the corresponding file paths in the `gs.graphlearn_torch` function in `k8s_launch.py` file: +- `manifest_path`: the path for configuration yaml file, `client.yaml` in this case. +- `client_folder_path`: the folder path for the client scripts, default `./` in this case. + +Then, configure the parameters in class `params` of `k8s_launch.py` file: +- NUM_SERVER_NODES: number of server nodes +- NUM_WORKER_NODES: number of worker nodes + +Now, we can run the following command to start the training and evaluation process. +```shell +python3 k8s_launch.py +``` +You can check the status of the job by running `kubectl logs [pod_name]`. \ No newline at end of file diff --git a/python/graphscope/learning/gl_torch_examples/k8s/__init__.py b/python/graphscope/learning/gl_torch_examples/k8s/__init__.py new file mode 100644 index 000000000000..39742aa7ff59 --- /dev/null +++ b/python/graphscope/learning/gl_torch_examples/k8s/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== diff --git a/python/graphscope/learning/gl_torch_examples/k8s/client.py b/python/graphscope/learning/gl_torch_examples/k8s/client.py new file mode 100755 index 000000000000..6b9000fc5943 --- /dev/null +++ b/python/graphscope/learning/gl_torch_examples/k8s/client.py @@ -0,0 +1,293 @@ +# Copyright 2022 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import argparse +import time +from typing import List + +import torch +import torch.nn.functional as F +from torch.distributed.algorithms.join import Join +from torch.nn.parallel import DistributedDataParallel +from torch_geometric.nn import GraphSAGE + +import graphscope as gs +import graphscope.learning.graphlearn_torch as glt +from graphscope.learning.gl_torch_graph import GLTorchGraph +from graphscope.learning.graphlearn_torch.typing import Split + +gs.set_option(log_level="DEBUG") +gs.set_option(show_log=True) + + +@torch.no_grad() +def test(model, test_loader, dataset_name): + model.eval() + xs = [] + y_true = [] + for i, batch in enumerate(test_loader): + if i == 0: + device = batch.x.device + batch.x = batch.x.to(torch.float32) # TODO + x = model.module(batch.x, batch.edge_index)[: batch.batch_size] + xs.append(x.cpu()) + y_true.append(batch.y[: batch.batch_size].cpu()) + del batch + + xs = [t.to(device) for t in xs] + y_true = [t.to(device) for t in y_true] + y_pred = torch.cat(xs, dim=0).argmax(dim=-1, keepdim=True) + y_true = torch.cat(y_true, dim=0) + test_acc = sum((y_pred.T == y_true.T)[0]) / len(y_true.T) + + return test_acc.item() + + +def run_client_proc( + glt_graph, + group_master: str, + num_servers: int, + num_clients: int, + client_rank: int, + server_rank_list: List[int], + dataset_name: str, + epochs: int, + batch_size: int, + training_pg_master_port: int, +): + + print("-- Initializing client ...") + glt.distributed.init_client( + num_servers=num_servers, + num_clients=num_clients, + client_rank=client_rank, + master_addr=glt_graph.master_addr, + master_port=glt_graph.server_client_master_port, + num_rpc_threads=4, + client_group_name="k8s_glt_client", + is_dynamic=True, + ) + + # Initialize training process group of PyTorch. + current_ctx = glt.distributed.get_context() + + torch.distributed.init_process_group( + backend="gloo", + rank=current_ctx.rank, + world_size=current_ctx.world_size, + init_method="tcp://{}:{}".format(group_master, training_pg_master_port), + ) + + device = torch.device("cpu") + # Create distributed neighbor loader on remote server for training. + print("-- Creating training dataloader ...") + train_loader = glt.distributed.DistNeighborLoader( + data=None, + num_neighbors=[5, 3, 2], + input_nodes=Split.train, + batch_size=batch_size, + shuffle=True, + collect_features=True, + to_device=device, + worker_options=glt.distributed.RemoteDistSamplingWorkerOptions( + server_rank=server_rank_list, + num_workers=1, + worker_devices=[torch.device("cpu")], + worker_concurrency=1, + buffer_size="256MB", + prefetch_size=1, + glt_graph=glt_graph, + workload_type="train", + ), + ) + + # Create distributed neighbor loader on remote server for testing. + print("-- Creating testing dataloader ...") + test_loader = glt.distributed.DistNeighborLoader( + data=None, + num_neighbors=[5, 3, 2], + input_nodes=Split.test, + batch_size=batch_size, + shuffle=False, + collect_features=True, + to_device=device, + worker_options=glt.distributed.RemoteDistSamplingWorkerOptions( + server_rank=server_rank_list, + num_workers=1, + worker_devices=[torch.device("cpu")], + worker_concurrency=1, + buffer_size="256MB", + prefetch_size=1, + glt_graph=glt_graph, + workload_type="test", + ), + ) + + # Define model and optimizer. + print("-- Initializing model and optimizer ...") + model = GraphSAGE( + in_channels=128, + hidden_channels=128, + num_layers=3, + out_channels=47, + ).to(device) + model = DistributedDataParallel(model, device_ids=None) + optimizer = torch.optim.Adam(model.parameters(), lr=0.001) + + # Train and test. + print("-- Start training and testing ...") + epochs = 10 + dataset_name = "ogbn-arxiv" + for epoch in range(0, epochs): + model.train() + start = time.time() + with Join([model]): + for batch in train_loader: + optimizer.zero_grad() + batch.x = batch.x.to(torch.float32) # TODO + out = model(batch.x, batch.edge_index)[: batch.batch_size].log_softmax( + dim=-1 + ) + loss = F.nll_loss(out, torch.flatten(batch.y[: batch.batch_size])) + loss.backward() + optimizer.step() + + end = time.time() + print(f"-- Epoch: {epoch:03d}, Loss: {loss:04f} Epoch Time: {end - start}") + torch.distributed.barrier() + # Test accuracy. + if epoch == 0 or epoch > (epochs // 2): + test_acc = test(model, test_loader, dataset_name) + print(f"-- Test Accuracy: {test_acc:.4f}") + torch.distributed.barrier() + + print("-- Shutdowning ...") + glt.distributed.shutdown_client() + + print("-- Exited ...") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Arguments for distributed training of supervised SAGE with servers." + ) + parser.add_argument( + "--dataset", + type=str, + default="ogbn-arxiv", + help="The name of ogbn arxiv.", + ) + parser.add_argument( + "--num_server_nodes", + type=int, + default=2, + help="Number of server nodes for remote sampling.", + ) + parser.add_argument( + "--num_client_nodes", + type=int, + default=1, + help="Number of client nodes for training.", + ) + parser.add_argument( + "--node_rank", + type=int, + default=0, + help="The node rank of the current role.", + ) + parser.add_argument( + "--epochs", + type=int, + default=10, + help="The number of training epochs. (client option)", + ) + parser.add_argument( + "--batch_size", + type=int, + default=256, + help="Batch size for the training and testing dataloader.", + ) + parser.add_argument( + "--training_pg_master_port", + type=int, + default=9997, + help="The port used for PyTorch's process group initialization across all training processes.", + ) + parser.add_argument( + "--train_loader_master_port", + type=int, + default=9998, + help="The port used for RPC initialization across all sampling workers of training loader.", + ) + parser.add_argument( + "--test_loader_master_port", + type=int, + default=9999, + help="The port used for RPC initialization across all sampling workers of testing loader.", + ) + parser.add_argument( + "--master_addr", + type=str, + default="localhost", + help="The master address of the graphlearn server.", + ) + parser.add_argument( + "--group_master", + type=str, + default="localhost", + help="The master address of the training process group.", + ) + args = parser.parse_args() + + print( + f"--- Distributed training example of supervised SAGE with server-client mode. Client {args.node_rank} ---" + ) + print(f"* dataset: {args.dataset}") + print(f"* total server nodes: {args.num_server_nodes}") + print(f"* total client nodes: {args.num_client_nodes}") + print(f"* node rank: {args.node_rank}") + + num_servers = args.num_server_nodes + num_clients = args.num_client_nodes + + print(f"* epochs: {args.epochs}") + print(f"* batch size: {args.batch_size}") + print(f"* training process group master port: {args.training_pg_master_port}") + print(f"* training loader master port: {args.train_loader_master_port}") + print(f"* testing loader master port: {args.test_loader_master_port}") + + client_rank = args.node_rank + print("--- Loading graph info ...") + glt_graph = GLTorchGraph( + [ + args.master_addr + ":9001", + args.master_addr + ":9002", + args.master_addr + ":9003", + args.master_addr + ":9004", + ] + ) + print("--- Launching client processes ...") + run_client_proc( + glt_graph, + args.group_master, + num_servers, + num_clients, + client_rank, + [server_rank for server_rank in range(num_servers)], + args.dataset, + args.epochs, + args.batch_size, + args.training_pg_master_port, + ) diff --git a/python/graphscope/learning/gl_torch_examples/k8s/client.yaml b/python/graphscope/learning/gl_torch_examples/k8s/client.yaml new file mode 100644 index 000000000000..cd82078e2d7e --- /dev/null +++ b/python/graphscope/learning/gl_torch_examples/k8s/client.yaml @@ -0,0 +1,68 @@ +apiVersion: "kubeflow.org/v1" +kind: PyTorchJob +metadata: + name: graphlearn-torch-client + namespace: default +spec: + pytorchReplicaSpecs: + Master: + replicas: 1 + restartPolicy: OnFailure + template: + spec: + containers: + - name: pytorch + image: registry.cn-hongkong.aliyuncs.com/graphscope/graphlearn-torch-client:0.26.0a20240115-x86_64 + imagePullPolicy: IfNotPresent + command: + - bash + - -c + - |- + python3 /workspace/client.py --node_rank 0 --master_addr ${MASTER_ADDR} --num_server_nodes ${NUM_SERVER_NODES} --num_client_nodes ${NUM_CLIENT_NODES} + volumeMounts: + - mountPath: /dev/shm + name: cache-volume + - mountPath: /workspace + name: client-volume + volumes: + - name: cache-volume + emptyDir: + medium: Memory + sizeLimit: "8G" + - name: client-volume + configMap: + name: graphlearn-torch-client-config + Worker: + replicas: ${NUM_WORKER_REPLICAS} + restartPolicy: OnFailure + template: + spec: + containers: + - name: pytorch + image: registry.cn-hongkong.aliyuncs.com/graphscope/graphlearn-torch-client:0.26.0a20240115-x86_64 + imagePullPolicy: IfNotPresent + command: + - bash + - -c + - |- + python3 /workspace/client.py --node_rank $((${MY_POD_NAME: -1}+1)) --master_addr ${MASTER_ADDR} --group_master ${GROUP_MASTER} --num_server_nodes ${NUM_SERVER_NODES} --num_client_nodes ${NUM_CLIENT_NODES} + env: + - name: GROUP_MASTER + value: graphlearn-torch-client-master-0 + - name: MY_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + volumeMounts: + - mountPath: /dev/shm + name: cache-volume + - mountPath: /workspace + name: client-volume + volumes: + - name: cache-volume + emptyDir: + medium: Memory + sizeLimit: "8G" + - name: client-volume + configMap: + name: graphlearn-torch-client-config diff --git a/python/graphscope/learning/gl_torch_examples/k8s/k8s_launch.py b/python/graphscope/learning/gl_torch_examples/k8s/k8s_launch.py new file mode 100644 index 000000000000..83cceddc0223 --- /dev/null +++ b/python/graphscope/learning/gl_torch_examples/k8s/k8s_launch.py @@ -0,0 +1,61 @@ +# Copyright 2022 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import graphscope as gs +from graphscope.dataset import load_ogbn_arxiv + +gs.set_option(log_level="DEBUG") +gs.set_option(show_log=True) + +params = { + "NUM_SERVER_NODES": 2, + "NUM_CLIENT_NODES": 2, +} + +# load the ogbn_arxiv graph as an example. +sess = gs.session( + with_dataset=True, + k8s_service_type="NodePort", + k8s_vineyard_mem="8Gi", + k8s_engine_mem="8Gi", + vineyard_shared_mem="8Gi", + k8s_image_pull_policy="IfNotPresent", + k8s_image_tag="0.26.0a20240115-x86_64", + num_workers=params["NUM_SERVER_NODES"], +) +g = load_ogbn_arxiv(sess=sess, prefix="/dataset/ogbn_arxiv") + +glt_graph = gs.graphlearn_torch( + g, + edges=[ + ("paper", "citation", "paper"), + ], + node_features={ + "paper": [f"feat_{i}" for i in range(128)], + }, + node_labels={ + "paper": "label", + }, + edge_dir="out", + random_node_split={ + "num_val": 0.1, + "num_test": 0.1, + }, + num_clients=params["NUM_CLIENT_NODES"], + manifest_path="./client.yaml", + client_folder_path="./", +) + +print("Exiting...") diff --git a/python/graphscope/learning/gl_torch_examples/local.py b/python/graphscope/learning/gl_torch_examples/local.py index 2f71ffd0cf66..ff326da85f76 100644 --- a/python/graphscope/learning/gl_torch_examples/local.py +++ b/python/graphscope/learning/gl_torch_examples/local.py @@ -1,3 +1,18 @@ +# Copyright 2022 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + import time import torch diff --git a/python/graphscope/learning/gl_torch_examples/local_sc_ddp.py b/python/graphscope/learning/gl_torch_examples/local_sc_ddp.py new file mode 100644 index 000000000000..fe7cf46b268f --- /dev/null +++ b/python/graphscope/learning/gl_torch_examples/local_sc_ddp.py @@ -0,0 +1,306 @@ +# Copyright 2022 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import argparse +import pickle +import time +from typing import List + +import torch +import torch.nn.functional as F +from torch.distributed.algorithms.join import Join +from torch.nn.parallel import DistributedDataParallel +from torch_geometric.nn import GraphSAGE + +import graphscope as gs +import graphscope.learning.graphlearn_torch as glt +from graphscope.learning.graphlearn_torch.typing import Split + +gs.set_option(log_level="DEBUG") +gs.set_option(show_log=True) + + +@torch.no_grad() +def test(model, test_loader, dataset_name): + model.eval() + xs = [] + y_true = [] + for i, batch in enumerate(test_loader): + if i == 0: + device = batch.x.device + batch.x = batch.x.to(torch.float32) # TODO + x = model.module(batch.x, batch.edge_index)[: batch.batch_size] + xs.append(x.cpu()) + y_true.append(batch.y[: batch.batch_size].cpu()) + del batch + + xs = [t.to(device) for t in xs] + y_true = [t.to(device) for t in y_true] + y_pred = torch.cat(xs, dim=0).argmax(dim=-1, keepdim=True) + y_true = torch.cat(y_true, dim=0) + test_acc = sum((y_pred.T[0] == y_true)) / len(y_true) + + return test_acc.item() + + +def run_client_proc( + glt_graph, + num_servers: int, + num_clients: int, + client_rank: int, + server_rank_list: List[int], + dataset_name: str, + epochs: int, + batch_size: int, + training_pg_master_port: int, + train_loader_master_port: int, + test_loader_master_port: int, +): + + print("-- Initializing client ...") + glt.distributed.init_client( + num_servers=num_servers, + num_clients=num_clients, + client_rank=client_rank, + master_addr=glt_graph.master_addr, + master_port=glt_graph.server_client_master_port, + num_rpc_threads=4, + is_dynamic=True, + ) + + # Initialize training process group of PyTorch. + current_ctx = glt.distributed.get_context() + + torch.distributed.init_process_group( + backend="gloo", + rank=current_ctx.rank, + world_size=current_ctx.world_size, + init_method="tcp://{}:{}".format( + glt_graph.master_addr, training_pg_master_port + ), + ) + + device = torch.device("cpu") + # Create distributed neighbor loader on remote server for training. + print("-- Creating training dataloader ...") + train_loader = glt.distributed.DistNeighborLoader( + data=None, + num_neighbors=[10, 5, 3], + input_nodes=Split.train, + batch_size=batch_size, + shuffle=True, + collect_features=True, + to_device=device, + worker_options=glt.distributed.RemoteDistSamplingWorkerOptions( + server_rank=server_rank_list, + num_workers=1, + worker_devices=[torch.device("cpu")], + worker_concurrency=1, + buffer_size="1GB", + prefetch_size=1, + master_port=train_loader_master_port, + glt_graph=glt_graph, + workload_type="train", + ), + ) + + # Create distributed neighbor loader on remote server for testing. + print("-- Creating testing dataloader ...") + test_loader = glt.distributed.DistNeighborLoader( + data=None, + num_neighbors=[10, 5, 3], + input_nodes=Split.test, + batch_size=batch_size, + shuffle=False, + collect_features=True, + to_device=device, + worker_options=glt.distributed.RemoteDistSamplingWorkerOptions( + server_rank=server_rank_list, + num_workers=1, + worker_devices=[torch.device("cpu")], + worker_concurrency=1, + buffer_size="1GB", + prefetch_size=1, + master_port=test_loader_master_port, + glt_graph=glt_graph, + workload_type="test", + ), + ) + + # Define model and optimizer. + print("-- Initializing model and optimizer ...") + model = GraphSAGE( + in_channels=128, + hidden_channels=256, + num_layers=3, + out_channels=47, + ).to(device) + model = DistributedDataParallel(model, device_ids=None) + optimizer = torch.optim.Adam(model.parameters(), lr=0.001) + + # Train and test. + print("-- Start training and testing ...") + epochs = 10 + dataset_name = "ogbn-arxiv" + for epoch in range(0, epochs): + model.train() + start = time.time() + with Join([model]): + for batch in train_loader: + optimizer.zero_grad() + batch.x = batch.x.to(torch.float32) # TODO + out = model(batch.x, batch.edge_index)[: batch.batch_size].log_softmax( + dim=-1 + ) + loss = F.nll_loss(out, torch.flatten(batch.y[: batch.batch_size])) + loss.backward() + optimizer.step() + + end = time.time() + print(f"-- Epoch: {epoch:03d}, Loss: {loss:04f} Epoch Time: {end - start}") + torch.distributed.barrier() + # Test accuracy. + if epoch == 0 or epoch > (epochs // 2): + test_acc = test(model, test_loader, dataset_name) + print(f"-- Test Accuracy: {test_acc:.4f}") + torch.distributed.barrier() + + print("-- Shutdowning ...") + glt.distributed.shutdown_client() + + print("-- Exited ...") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Arguments for distributed training of supervised SAGE with servers." + ) + parser.add_argument( + "--dataset", + type=str, + default="ogbn-arxiv", + help="The name of ogbn arxiv.", + ) + parser.add_argument( + "--num_server_nodes", + type=int, + default=2, + help="Number of server nodes for remote sampling.", + ) + parser.add_argument( + "--num_client_nodes", + type=int, + default=2, + help="Number of client nodes for training.", + ) + parser.add_argument( + "--epochs", + type=int, + default=10, + help="The number of training epochs. (client option)", + ) + parser.add_argument( + "--batch_size", + type=int, + default=256, + help="Batch size for the training and testing dataloader.", + ) + parser.add_argument( + "--training_pg_master_port", + type=int, + default=9997, + help="The port used for PyTorch's process group initialization across all training processes.", + ) + parser.add_argument( + "--train_loader_master_port", + type=int, + default=9998, + help="The port used for RPC initialization across all sampling workers of training loader.", + ) + parser.add_argument( + "--test_loader_master_port", + type=int, + default=9999, + help="The port used for RPC initialization across all sampling workers of testing loader.", + ) + args = parser.parse_args() + + print(f"* dataset: {args.dataset}") + print(f"* total server nodes: {args.num_server_nodes}") + print(f"* total client nodes: {args.num_client_nodes}") + + num_servers = args.num_server_nodes + num_clients = args.num_client_nodes + + print(f"* epochs: {args.epochs}") + print(f"* batch size: {args.batch_size}") + print(f"* training process group master port: {args.training_pg_master_port}") + print(f"* training loader master port: {args.train_loader_master_port}") + print(f"* testing loader master port: {args.test_loader_master_port}") + + print("--- Launching sampling server processes ...") + import graphscope as gs + from graphscope.dataset import load_ogbn_arxiv + + gs.set_option(log_level="DEBUG") + gs.set_option(show_log=True) + + # load the ogbn_arxiv graph as an example. + sess = gs.session(cluster_type="hosts", num_workers=num_servers) + g = load_ogbn_arxiv(sess=sess) + + glt_graph = gs.graphlearn_torch( + g, + edges=[ + ("paper", "citation", "paper"), + ], + node_features={ + "paper": [f"feat_{i}" for i in range(128)], + }, + node_labels={ + "paper": "label", + }, + edge_dir="out", + random_node_split={ + "num_val": 0.1, + "num_test": 0.1, + }, + ) + + print("--- Launching client processes ...") + mp_context = torch.multiprocessing.get_context("spawn") + cprocs = [] + for client_rank in range(num_clients): + cproc = mp_context.Process( + target=run_client_proc, + args=( + glt_graph, + num_servers, + num_clients, + client_rank, + [server_rank for server_rank in range(num_servers)], + args.dataset, + args.epochs, + args.batch_size, + args.training_pg_master_port, + args.train_loader_master_port, + args.test_loader_master_port, + ), + ) + cprocs.append(cproc) + for cproc in cprocs: + cproc.start() + for cproc in cprocs: + cproc.join() diff --git a/python/graphscope/learning/gl_torch_graph.py b/python/graphscope/learning/gl_torch_graph.py index f7b1d0191e6a..c0039222638d 100644 --- a/python/graphscope/learning/gl_torch_graph.py +++ b/python/graphscope/learning/gl_torch_graph.py @@ -1,3 +1,22 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + class GLTorchGraph(object): def __init__(self, server_list): assert len(server_list) == 4 diff --git a/python/graphscope/learning/utils.py b/python/graphscope/learning/utils.py new file mode 100644 index 000000000000..7f3ddb26a721 --- /dev/null +++ b/python/graphscope/learning/utils.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os + +import yaml + + +def read_folder_files_content(folder_path): + files_content = {} + for filename in os.listdir(folder_path): + file_path = os.path.join(folder_path, filename) + # make sure it's a file not a directory + if os.path.isfile(file_path): + with open(file_path, "r") as file: + files_content[filename] = file.read() + return files_content + + +def fill_params_in_yaml(file_path, params): + with open(file_path, "r") as file: + yaml_content = file.read() + for param_key, param_value in params.items(): + yaml_content = yaml_content.replace( + "${" + param_key + "}", str(param_value) + ) + return yaml.safe_load(yaml_content) diff --git a/python/setup.py b/python/setup.py index e2eb5e3326fb..8d906dae4f80 100644 --- a/python/setup.py +++ b/python/setup.py @@ -313,8 +313,8 @@ def build_learning_engine(): sys.path.insert( 0, os.path.join(glt_root_path, "graphlearn_torch", "python", "utils") ) - from build import glt_ext_module - from build import glt_v6d_ext_module + from build_glt import glt_ext_module + from build_glt import glt_v6d_ext_module ext_modules.append( glt_ext_module(