From 626501f55c70c3d887c46fe4ded3e1ddc9922185 Mon Sep 17 00:00:00 2001 From: Dongze Li Date: Fri, 14 Jun 2024 12:42:10 +0800 Subject: [PATCH] feat: Start the service when switching graph context on Interactive (#3910) ## What do these changes do? - Start the service when switching graph context - Move `gsctl service ` from GLOBAL to GRAPH context - Move `gsctl create/delete datasource/loaderjob` from GRAPH to GLOBAL context - Display GRAPH_NAME(GRAPH_ID) in `gsctl service status` and `gsctl use GRAPH/GLOBAL` command - Return metadata when uploading file - Add `Y/N` interaction when deleting graph/storedproc/job resource ## Related issue number Fixes --------- Co-authored-by: Zhang Lei --- Makefile | 4 +- .../gscoordinator/flex/core/client_wrapper.py | 11 +- coordinator/gscoordinator/flex/core/utils.py | 24 ++ .../flex/models/upload_file_response.py | 34 ++- .../gscoordinator/flex/openapi/openapi.yaml | 12 + docs/flex/coordinator.md | 2 +- docs/utilities/gs.md | 9 +- flex/openapi/openapi_coordinator.yaml | 9 + .../flex/rest/models/upload_file_response.py | 6 +- python/graphscope/gsctl/commands/__init__.py | 6 +- .../gsctl/commands/interactive/glob.py | 186 ++++++++++----- .../gsctl/commands/interactive/graph.py | 222 ++++++++---------- python/graphscope/gsctl/config.py | 7 + python/graphscope/gsctl/impl/__init__.py | 1 + python/graphscope/gsctl/impl/graph.py | 8 + python/graphscope/gsctl/impl/utils.py | 3 +- python/graphscope/gsctl/utils.py | 27 +-- 17 files changed, 354 insertions(+), 217 deletions(-) diff --git a/Makefile b/Makefile index 1c48747da292..683498372d88 100644 --- a/Makefile +++ b/Makefile @@ -79,9 +79,7 @@ clean: gsctl: cd $(CLIENT_DIR) && \ - python3 setup_gsctl.py bdist_wheel && \ - python3 -m pip install dist/gsctl*.whl --force-reinstall && \ - rm -fr build + python3 ./setup_gsctl.py develop --user client: learning diff --git a/coordinator/gscoordinator/flex/core/client_wrapper.py b/coordinator/gscoordinator/flex/core/client_wrapper.py index 424af453cfde..30068d4ece66 100644 --- a/coordinator/gscoordinator/flex/core/client_wrapper.py +++ b/coordinator/gscoordinator/flex/core/client_wrapper.py @@ -37,6 +37,7 @@ from gscoordinator.flex.core.insight import init_groot_client from gscoordinator.flex.core.interactive import init_hqps_client from gscoordinator.flex.core.utils import encode_datetime +from gscoordinator.flex.core.utils import parse_file_metadata from gscoordinator.flex.models import CreateDataloadingJobResponse from gscoordinator.flex.models import CreateEdgeType from gscoordinator.flex.models import CreateGraphRequest @@ -289,7 +290,8 @@ def upload_file(self, filestorage) -> str: if CLUSTER_TYPE == "HOSTS": filepath = os.path.join(DATASET_WORKSPACE, filestorage.filename) filestorage.save(filepath) - return UploadFileResponse.from_dict({"file_path": filepath}) + metadata = parse_file_metadata(filepath) + return UploadFileResponse.from_dict({"file_path": filepath, "metadata": metadata}) def bind_datasource_in_batch( self, graph_id: str, schema_mapping: SchemaMapping @@ -301,9 +303,10 @@ def bind_datasource_in_batch( schema_mapping_dict["vertex_mappings"], schema_mapping_dict["edge_mappings"], ): - for column_mapping in mapping["column_mappings"]: - if "_property" in column_mapping: - column_mapping["property"] = column_mapping.pop("_property") + if "column_mappings" in mapping and mapping["column_mappings"] is not None: + for column_mapping in mapping["column_mappings"]: + if "_property" in column_mapping: + column_mapping["property"] = column_mapping.pop("_property") if ( "source_vertex_mappings" in mapping and "destination_vertex_mappings" in mapping diff --git a/coordinator/gscoordinator/flex/core/utils.py b/coordinator/gscoordinator/flex/core/utils.py index ff6c65fa9c26..609ab85dbf8e 100644 --- a/coordinator/gscoordinator/flex/core/utils.py +++ b/coordinator/gscoordinator/flex/core/utils.py @@ -19,7 +19,9 @@ import datetime import functools import logging +import os import random +import re import socket import string from typing import Union @@ -83,6 +85,28 @@ def get_internal_ip() -> str: return internal_ip +def parse_file_metadata(location: str) -> dict: + """ + Args: + location: optional values: + odps://path/to/file, hdfs://path/to/file, file:///path/to/file + /home/graphscope/path/to/file + """ + metadata = {"datasource": "file"} + path = location + pattern = r"^(odps|hdfs|file|oss|s3)?://([\w/.-]+)$" + match = re.match(pattern, location) + if match: + datasource = match.group(1) + metadata["datasource"] = datasource + if datasource == "file": + path = match.group(2) + if metadata["datasource"] == "file": + _, file_extension = os.path.splitext(path) + metadata["file_type"] = file_extension[1:] + return metadata + + def get_public_ip() -> Union[str, None]: try: response = requests.get("https://api.ipify.org?format=json") diff --git a/coordinator/gscoordinator/flex/models/upload_file_response.py b/coordinator/gscoordinator/flex/models/upload_file_response.py index b14ecb242d1b..30f289473916 100644 --- a/coordinator/gscoordinator/flex/models/upload_file_response.py +++ b/coordinator/gscoordinator/flex/models/upload_file_response.py @@ -12,21 +12,26 @@ class UploadFileResponse(Model): Do not edit the class manually. """ - def __init__(self, file_path=None): # noqa: E501 + def __init__(self, file_path=None, metadata=None): # noqa: E501 """UploadFileResponse - a model defined in OpenAPI :param file_path: The file_path of this UploadFileResponse. # noqa: E501 :type file_path: str + :param metadata: The metadata of this UploadFileResponse. # noqa: E501 + :type metadata: Dict[str, object] """ self.openapi_types = { - 'file_path': str + 'file_path': str, + 'metadata': Dict[str, object] } self.attribute_map = { - 'file_path': 'file_path' + 'file_path': 'file_path', + 'metadata': 'metadata' } self._file_path = file_path + self._metadata = metadata @classmethod def from_dict(cls, dikt) -> 'UploadFileResponse': @@ -61,3 +66,26 @@ def file_path(self, file_path: str): raise ValueError("Invalid value for `file_path`, must not be `None`") # noqa: E501 self._file_path = file_path + + @property + def metadata(self) -> Dict[str, object]: + """Gets the metadata of this UploadFileResponse. + + + :return: The metadata of this UploadFileResponse. + :rtype: Dict[str, object] + """ + return self._metadata + + @metadata.setter + def metadata(self, metadata: Dict[str, object]): + """Sets the metadata of this UploadFileResponse. + + + :param metadata: The metadata of this UploadFileResponse. + :type metadata: Dict[str, object] + """ + if metadata is None: + raise ValueError("Invalid value for `metadata`, must not be `None`") # noqa: E501 + + self._metadata = metadata diff --git a/coordinator/gscoordinator/flex/openapi/openapi.yaml b/coordinator/gscoordinator/flex/openapi/openapi.yaml index 9e93c04bdaa1..5f67843adf4b 100644 --- a/coordinator/gscoordinator/flex/openapi/openapi.yaml +++ b/coordinator/gscoordinator/flex/openapi/openapi.yaml @@ -473,6 +473,11 @@ paths: "200": content: application/json: + example: + file_path: /home/graphscope/path/to/file.csv + metadata: + datasource: file + file_type: csv schema: $ref: '#/components/schemas/UploadFileResponse' description: successful operation @@ -2008,12 +2013,19 @@ components: UploadFileResponse: example: file_path: file_path + metadata: + key: "" properties: file_path: title: file_path type: string + metadata: + additionalProperties: true + title: metadata + type: object required: - file_path + - metadata title: UploadFileResponse LongText: properties: diff --git a/docs/flex/coordinator.md b/docs/flex/coordinator.md index d1290a7154c4..0b47897a8eb5 100644 --- a/docs/flex/coordinator.md +++ b/docs/flex/coordinator.md @@ -1,4 +1,4 @@ -# GraphScope Coordinator +# Coordinator The GraphScope Coordinator serves as a centralized entry point for users, providing a RESTful API that follows the Swagger specification. It supports multiple language SDKs, including Python, and offers a unified interface. The main purpose of the Coordinator is to abstract and standardize the underlying engines and storage systems, shielding users from their complexities. This allows users to interact with the GraphScope platform through a simplified and consistent set of APIs, making it easier for users to understand and utilize the functionalities provided by GraphScope. diff --git a/docs/utilities/gs.md b/docs/utilities/gs.md index 903828e6b559..469bde45cc0e 100644 --- a/docs/utilities/gs.md +++ b/docs/utilities/gs.md @@ -1,23 +1,22 @@ # Command-line Utility `gsctl` -`gsctl` is a command-line utility for GraphScope. It is shipped with `graphscope-client` and provides a set of functionalities to make it easy to use GraphScope. These functionalities include building and testing binaries, managing sessions and resources, and more. +`gsctl` is a command-line utility for GraphScope. It provides a set of functionalities to make it easy to use GraphScope. These functionalities include building and testing binaries, managing sessions and resources, and more. ## Install/Update `gsctl` -Since it is shipped with python package `graphscope-client`, the `gsctl` command will be available in your terminal after installing GraphScope: ```bash -$ pip3 install graphscope-client +$ pip3 install gsctl ``` In some cases, such as development on `gsctl`, you may want to build it from source. To do this, navigate to the directory where the source code is located and run the following command: ```bash -$ cd REPO_HOME/python +$ cd REPO_HOME # If you want to develop gsctl, # please note the entry point is located on: # /python/graphscope/gsctl/gsctl.py -$ pip3 install --editable . +$ make gsctl ``` This will install `gsctl` in an editable mode, which means that any changes you make to the source code will be reflected in the installed version of `gsctl`. diff --git a/flex/openapi/openapi_coordinator.yaml b/flex/openapi/openapi_coordinator.yaml index 9201723ce6a4..f71388f4c937 100644 --- a/flex/openapi/openapi_coordinator.yaml +++ b/flex/openapi/openapi_coordinator.yaml @@ -169,9 +169,13 @@ components: UploadFileResponse: required: - file_path + - metadata properties: file_path: type: string + metadata: + type: object + additionalProperties: true LongText: required: @@ -2297,5 +2301,10 @@ paths: application/json: schema: $ref: '#/components/schemas/UploadFileResponse' + example: + file_path: /home/graphscope/path/to/file.csv + metadata: + datasource: file + file_type: csv 500: $ref: "#/components/responses/500" diff --git a/python/graphscope/flex/rest/models/upload_file_response.py b/python/graphscope/flex/rest/models/upload_file_response.py index 5d90a08bef55..29daa96ec0ff 100644 --- a/python/graphscope/flex/rest/models/upload_file_response.py +++ b/python/graphscope/flex/rest/models/upload_file_response.py @@ -28,7 +28,8 @@ class UploadFileResponse(BaseModel): UploadFileResponse """ # noqa: E501 file_path: StrictStr - __properties: ClassVar[List[str]] = ["file_path"] + metadata: Dict[str, Any] + __properties: ClassVar[List[str]] = ["file_path", "metadata"] model_config = { "populate_by_name": True, @@ -81,7 +82,8 @@ def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: return cls.model_validate(obj) _obj = cls.model_validate({ - "file_path": obj.get("file_path") + "file_path": obj.get("file_path"), + "metadata": obj.get("metadata") }) return _obj diff --git a/python/graphscope/gsctl/commands/__init__.py b/python/graphscope/gsctl/commands/__init__.py index 5998bb958dd7..437decd6ed22 100644 --- a/python/graphscope/gsctl/commands/__init__.py +++ b/python/graphscope/gsctl/commands/__init__.py @@ -108,7 +108,11 @@ def get_command_collection(context: Context): commands = click.CommandCollection(sources=[common, interactive]) else: if len(sys.argv) < 2 or sys.argv[1] != "use": - info(f"Using GRAPH {context.context}.", fg="green", bold=True) + info( + f"Using GRAPH {context.graph_name}(id={context.context}).", + fg="green", + bold=True, + ) info("Run `gsctl use GLOBAL` to switch back to GLOBAL context.\n") commands = click.CommandCollection(sources=[common, interactive_graph]) elif is_insight_mode(context.flex): diff --git a/python/graphscope/gsctl/commands/interactive/glob.py b/python/graphscope/gsctl/commands/interactive/glob.py index 4167d7277188..6edb74dcadba 100644 --- a/python/graphscope/gsctl/commands/interactive/glob.py +++ b/python/graphscope/gsctl/commands/interactive/glob.py @@ -19,18 +19,23 @@ import click import yaml +from graphscope.gsctl.impl import bind_datasource_in_batch from graphscope.gsctl.impl import create_graph from graphscope.gsctl.impl import delete_graph_by_id +from graphscope.gsctl.impl import delete_job_by_id from graphscope.gsctl.impl import get_datasource_by_id from graphscope.gsctl.impl import get_graph_id_by_name +from graphscope.gsctl.impl import get_graph_name_by_id +from graphscope.gsctl.impl import get_job_by_id from graphscope.gsctl.impl import list_graphs from graphscope.gsctl.impl import list_jobs from graphscope.gsctl.impl import list_service_status from graphscope.gsctl.impl import list_stored_procedures -from graphscope.gsctl.impl import restart_service from graphscope.gsctl.impl import start_service -from graphscope.gsctl.impl import stop_service +from graphscope.gsctl.impl import submit_dataloading_job from graphscope.gsctl.impl import switch_context +from graphscope.gsctl.impl import unbind_edge_datasource +from graphscope.gsctl.impl import unbind_vertex_datasource from graphscope.gsctl.utils import TreeDisplay from graphscope.gsctl.utils import err from graphscope.gsctl.utils import info @@ -47,19 +52,19 @@ def cli(): @cli.group() def create(): - """Create a new graph in database""" + """Create graph, data source, loader job from file""" pass @cli.group() def delete(): - """Delete a graph by identifier""" + """Delete graph, data source, loader job by id""" pass @cli.group() -def service(): - """Start, stop, and restart the database service""" +def desc(): + """Show job's details by id""" pass @@ -71,11 +76,20 @@ def service(): def use(context, graph_identifier): """Switch to GRAPH context, see identifier with `ls` command""" try: - switch_context(get_graph_id_by_name(graph_identifier)) + graph_identifier = get_graph_id_by_name(graph_identifier) + graph_name = get_graph_name_by_id(graph_identifier) + status = list_service_status() + for s in status: + if s.graph_id == graph_identifier and s.status != "Running": + info( + f"Starting service on graph {graph_name}(id={graph_identifier})..." + ) + start_service(graph_identifier) + switch_context(graph_identifier, graph_name) except Exception as e: err(f"Failed to switch context: {str(e)}") else: - click.secho(f"Using GRAPH {graph_identifier}", fg="green") + click.secho(f"Using GRAPH {graph_name}(id={graph_identifier})", fg="green") @cli.command() @@ -89,7 +103,7 @@ def ls(l): # noqa: F811, E741 # schema tree.create_graph_node(g, recursive=l) if l: - # data source mappin + # data source mapping datasource_mapping = get_datasource_by_id(g.id) tree.create_datasource_mapping_node(g, datasource_mapping) # stored procedure @@ -132,86 +146,134 @@ def graph(filename): # noqa: F811 def graph(graph_identifier): # noqa: F811 """Delete a graph, see graph identifier with `ls` command""" try: - delete_graph_by_id(get_graph_id_by_name(graph_identifier)) + if click.confirm("Do you want to continue?"): + delete_graph_by_id(get_graph_id_by_name(graph_identifier)) + succ(f"Delete graph {graph_identifier} successfully.") except Exception as e: err(f"Failed to delete graph {graph_identifier}: {str(e)}") - else: - succ(f"Delete graph {graph_identifier} successfully.") -@service.command -def stop(): # noqa: F811 - """Stop current database service""" +@create.command +@click.option( + "-g", + "--graph_identifier", + required=True, + help="See graph identifier with `ls` command", +) +@click.option( + "-f", + "--filename", + required=True, + help="Path of yaml file", +) +def datasource(graph_identifier, filename): # noqa: F811 + """Bind data source mapping from file""" + if not is_valid_file_path(filename): + err(f"Invalid file: {filename}") + return + graph_identifier = get_graph_id_by_name(graph_identifier) try: - stop_service() + datasource = read_yaml_file(filename) + bind_datasource_in_batch(graph_identifier, datasource) except Exception as e: - err(f"Failed to stop service: {str(e)}") + err(f"Failed to bind data source: {str(e)}") else: - succ("Service stopped.") + succ("Bind data source successfully.") -@service.command +@delete.command @click.option( "-g", "--graph_identifier", required=True, help="See graph identifier with `ls` command", ) -def start(graph_identifier): # noqa: F811 - """Start database service on a certain graph""" +@click.option( + "-t", + "--type", + required=True, + help="Vertex or edge type", +) +@click.option( + "-s", + "--source_vertex_type", + required=False, + help="Source vertex type of the edge [edge only]", +) +@click.option( + "-d", + "--destination_vertex_type", + required=False, + help="Destination vertex type of the edge [edge only]", +) +def datasource( # noqa: F811 + graph_identifier, type, source_vertex_type, destination_vertex_type +): + """Unbind data source mapping on vertex or edge type""" try: - start_service(get_graph_id_by_name(graph_identifier)) + graph_identifier = get_graph_id_by_name(graph_identifier) + if source_vertex_type is not None and destination_vertex_type is not None: + unbind_edge_datasource( + graph_identifier, type, source_vertex_type, destination_vertex_type + ) + else: + unbind_vertex_datasource(graph_identifier, type) except Exception as e: - err(f"Failed to start service on graph {graph_identifier}: {str(e)}") + err(f"Failed to unbind data source: {str(e)}") else: - succ(f"Start service on graph {graph_identifier} successfully") + succ("Unbind data source successfully.") -@service.command -def restart(): # noqa: F811 - """Restart database service on current graph""" +@create.command() +@click.option( + "-g", + "--graph_identifier", + required=True, + help="See graph identifier with `ls` command", +) +@click.option( + "-f", + "--filename", + required=True, + help="Path of yaml file", +) +def loaderjob(graph_identifier, filename): # noqa: F811 + """Create a data loading job from file""" + if not is_valid_file_path(filename): + err(f"Invalid file: {filename}") + return + graph_identifier = get_graph_id_by_name(graph_identifier) try: - restart_service() + config = read_yaml_file(filename) + jobid = submit_dataloading_job(graph_identifier, config) except Exception as e: - err(f"Failed to restart service: {str(e)}") + err(f"Failed to create a job: {str(e)}") else: - succ("Service restarted.") - - -@service.command -def ls(): # noqa: F811 - """Display current service status""" - - def _construct_and_display_data(status): - head = [ - "STATUS", - "SERVING_GRAPH(IDENTIFIER)", - "CYPHER_ENDPOINT", - "HQPS_ENDPOINT", - "GREMLIN_ENDPOINT", - ] - data = [head] - for s in status: - if s.status == "Stopped": - data.append([s.status, s.graph_id, "-", "-", "-"]) - else: - data.append( - [ - s.status, - s.graph_id, - s.sdk_endpoints.cypher, - s.sdk_endpoints.hqps, - s.sdk_endpoints.gremlin, - ] - ) - terminal_display(data) + succ(f"Create job {jobid} successfully.") + +@delete.command() +@click.argument("identifier", required=True) +def job(identifier): # noqa: F811 + """Cancel a job, see identifier with `ls` command""" try: - status = list_service_status() + if click.confirm("Do you want to continue?"): + delete_job_by_id(identifier) + succ(f"Delete job {identifier} successfully.") + except Exception as e: + err(f"Failed to delete job {identifier}: {str(e)}") + + +@desc.command() +@click.argument("identifier", required=True) +def job(identifier): # noqa: F811 + """Show details of job, see identifier with `ls` command""" + try: + job = get_job_by_id(identifier) except Exception as e: - err(f"Failed to list service status: {str(e)}") + err(f"Failed to get job: {str(e)}") else: - _construct_and_display_data(status) + info(yaml.dump(job.to_dict())) if __name__ == "__main__": diff --git a/python/graphscope/gsctl/commands/interactive/graph.py b/python/graphscope/gsctl/commands/interactive/graph.py index 942972f3db1e..ee2431b2a23e 100644 --- a/python/graphscope/gsctl/commands/interactive/graph.py +++ b/python/graphscope/gsctl/commands/interactive/graph.py @@ -20,26 +20,24 @@ import yaml from graphscope.gsctl.config import get_current_context -from graphscope.gsctl.impl import bind_datasource_in_batch from graphscope.gsctl.impl import create_stored_procedure -from graphscope.gsctl.impl import delete_job_by_id from graphscope.gsctl.impl import delete_stored_procedure_by_id from graphscope.gsctl.impl import get_datasource_by_id -from graphscope.gsctl.impl import get_graph_id_by_name -from graphscope.gsctl.impl import get_job_by_id +from graphscope.gsctl.impl import get_graph_name_by_id from graphscope.gsctl.impl import list_graphs -from graphscope.gsctl.impl import list_jobs +from graphscope.gsctl.impl import list_service_status from graphscope.gsctl.impl import list_stored_procedures -from graphscope.gsctl.impl import submit_dataloading_job +from graphscope.gsctl.impl import restart_service +from graphscope.gsctl.impl import start_service +from graphscope.gsctl.impl import stop_service from graphscope.gsctl.impl import switch_context -from graphscope.gsctl.impl import unbind_edge_datasource -from graphscope.gsctl.impl import unbind_vertex_datasource from graphscope.gsctl.utils import TreeDisplay from graphscope.gsctl.utils import err from graphscope.gsctl.utils import info from graphscope.gsctl.utils import is_valid_file_path from graphscope.gsctl.utils import read_yaml_file from graphscope.gsctl.utils import succ +from graphscope.gsctl.utils import terminal_display @click.group() @@ -49,19 +47,25 @@ def cli(): @cli.group() def create(): - """Create stored procedure, data source, loader job from file""" + """Create stored procedure from file""" pass @cli.group() def delete(): - """Delete stored procedure, data source, loader job by id""" + """Delete stored procedure by id""" pass @cli.group() def desc(): - """Show details of job status and stored procedure by id""" + """Show stored procedure's details by id""" + pass + + +@cli.group() +def service(): + """Start, stop, and restart the database service""" pass @@ -73,7 +77,7 @@ def use(): @cli.command() def ls(): # noqa: F811 - """Display schema, stored procedure, and job information""" + """Display schema and stored procedure information""" tree = TreeDisplay() # context current_context = get_current_context() @@ -92,9 +96,6 @@ def ls(): # noqa: F811 # stored procedure stored_procedures = list_stored_procedures(using_graph.id) tree.create_stored_procedure_node(using_graph, stored_procedures) - # job - jobs = list_jobs() - tree.create_job_node(using_graph, jobs) except Exception as e: err(f"Failed to display graph information: {str(e)}") else: @@ -131,141 +132,120 @@ def storedproc(identifier): # noqa: F811 current_context = get_current_context() graph_identifier = current_context.context try: - delete_stored_procedure_by_id(graph_identifier, identifier) + if click.confirm("Do you want to continue?"): + delete_stored_procedure_by_id(graph_identifier, identifier) + succ(f"Delete stored procedure {identifier} successfully.") except Exception as e: err(f"Failed to delete stored procedure: {str(e)}") - else: - succ(f"Delete stored procedure {identifier} successfully.") -@create.command -@click.option( - "-f", - "--filename", - required=True, - help="Path of yaml file", -) -def datasource(filename): # noqa: F811 - """Bind data source mapping from file""" - if not is_valid_file_path(filename): - err(f"Invalid file: {filename}") - return +@desc.command() +@click.argument("identifier", required=True) +def storedproc(identifier): # noqa: F811 + """Show details of stored procedure, see identifier with `ls` command""" current_context = get_current_context() - graph_identifier = current_context.context + graph_id = current_context.context try: - datasource = read_yaml_file(filename) - bind_datasource_in_batch(graph_identifier, datasource) + stored_procedures = list_stored_procedures(graph_id) except Exception as e: - err(f"Failed to bind data source: {str(e)}") + err(f"Failed to list stored procedures: {str(e)}") else: - succ("Bind data source successfully.") + if not stored_procedures: + info(f"No stored procedures found on {graph_id}.") + return + specific_stored_procedure_exist = False + for stored_procedure in stored_procedures: + if identifier == stored_procedure.id: + info(yaml.dump(stored_procedure.to_dict())) + specific_stored_procedure_exist = True + break + if not specific_stored_procedure_exist: + err(f"Stored Procedure {identifier} not found on {graph_id}.") -@delete.command -@click.option( - "-t", - "--type", - required=True, - help="Vertex or edge type", -) -@click.option( - "-s", - "--source_vertex_type", - required=False, - help="Source vertex type of the edge [edge only]", -) -@click.option( - "-d", - "--destination_vertex_type", - required=False, - help="Destination vertex type of the edge [edge only]", -) -def datasource(type, source_vertex_type, destination_vertex_type): # noqa: F811 - """Unbind data source mapping on vertex or edge type""" +@service.command +def stop(): # noqa: F811 + """Stop current database service""" try: - current_context = get_current_context() - graph_identifier = current_context.context - if source_vertex_type is not None and destination_vertex_type is not None: - unbind_edge_datasource( - graph_identifier, type, source_vertex_type, destination_vertex_type - ) - else: - unbind_vertex_datasource(graph_identifier, type) + stop_service() except Exception as e: - err(f"Failed to unbind data source: {str(e)}") + err(f"Failed to stop service: {str(e)}") else: - succ("Unbind data source successfully.") + succ("Service stopped.") -@create.command() -@click.option( - "-f", - "--filename", - required=True, - help="Path of yaml file", -) -def loaderjob(filename): # noqa: F811 - """Create a data loading job from file""" - if not is_valid_file_path(filename): - err(f"Invalid file: {filename}") - return - current_context = get_current_context() - graph_identifier = current_context.context +@service.command +def start(): # noqa: F811 + """Start current database service""" try: - config = read_yaml_file(filename) - jobid = submit_dataloading_job(graph_identifier, config) + current_context = get_current_context() + graph_identifier = current_context.context + + status = list_service_status() + for s in status: + if s.graph_id == graph_identifier: + if s.status != "Running": + info(f"Starting service on graph {graph_identifier}...") + start_service(graph_identifier) + succ("Service restarted.") + else: + info("Service is running...") except Exception as e: - err(f"Failed to create a job: {str(e)}") - else: - succ(f"Create job {jobid} successfully.") + err(f"Failed to start service: {str(e)}") -@delete.command() -@click.argument("identifier", required=True) -def job(identifier): # noqa: F811 - """Cancel a job, see identifier with `ls` command""" +@service.command +def restart(): # noqa: F811 + """Start current database service""" try: - delete_job_by_id(identifier) + restart_service() except Exception as e: - err(f"Failed to delete job {identifier}: {str(e)}") + err(f"Failed to restart service: {str(e)}") else: - succ(f"Delete job {identifier} successfully.") + succ("Service restarted.") -@desc.command() -@click.argument("identifier", required=True) -def job(identifier): # noqa: F811 - """Show details of job, see identifier with `ls` command""" - try: - job = get_job_by_id(identifier) - except Exception as e: - err(f"Failed to get job: {str(e)}") - else: - info(yaml.dump(job.to_dict())) +@service.command +def status(): # noqa: F811 + """Display current service status""" + def _construct_and_display_data(status): + current_context = get_current_context() + graph_identifier = current_context.context + graph_name = current_context.graph_name + + head = [ + "STATUS", + "SERVING_GRAPH(IDENTIFIER)", + "CYPHER_ENDPOINT", + "HQPS_ENDPOINT", + "GREMLIN_ENDPOINT", + ] + data = [head] + for s in status: + if s.graph_id == graph_identifier: + if s.status == "Stopped": + data.append( + [s.status, f"{graph_name}(id={s.graph_id})", "-", "-", "-"] + ) + else: + data.append( + [ + s.status, + f"{graph_name}(id={s.graph_id})", + s.sdk_endpoints.cypher, + s.sdk_endpoints.hqps, + s.sdk_endpoints.gremlin, + ] + ) + terminal_display(data) -@desc.command() -@click.argument("identifier", required=True) -def storedproc(identifier): # noqa: F811 - """Show details of stored procedure, see identifier with `ls` command""" - current_context = get_current_context() - graph_id = current_context.context try: - stored_procedures = list_stored_procedures(graph_id) + status = list_service_status() except Exception as e: - err(f"Failed to list stored procedures: {str(e)}") + err(f"Failed to list service status: {str(e)}") else: - if not stored_procedures: - info(f"No stored procedures found on {graph_id}.") - return - specific_stored_procedure_exist = False - for stored_procedure in stored_procedures: - if identifier == stored_procedure.id: - info(yaml.dump(stored_procedure.to_dict())) - specific_stored_procedure_exist = True - break - if not specific_stored_procedure_exist: - err(f"Stored Procedure {identifier} not found on {graph_id}.") + _construct_and_display_data(status) @use.command(name="GLOBAL") diff --git a/python/graphscope/gsctl/config.py b/python/graphscope/gsctl/config.py index 4eaa4dc3d566..067b8660a8ee 100644 --- a/python/graphscope/gsctl/config.py +++ b/python/graphscope/gsctl/config.py @@ -46,6 +46,7 @@ def __init__( name=None, timestamp=time.time(), context="global", + graph_name=None, ): if name is None: name = "context_" + "".join(random.choices(ascii_letters, k=8)) @@ -55,14 +56,19 @@ def __init__( self.coordinator_endpoint = coordinator_endpoint # switch to specific graph after `using graph` self.context = context + self.graph_name = graph_name self.timestamp = timestamp def switch_context(self, context: str): self.context = context + def set_graph_name(self, graph_name: str): + self.graph_name = graph_name + def to_dict(self) -> dict: return { "name": self.name, + "graph_name": str(self.graph_name), "flex": self.flex, "coordinator_endpoint": self.coordinator_endpoint, "context": self.context, @@ -77,6 +83,7 @@ def from_dict(cls, dikt): name=dikt.get("name"), timestamp=dikt.get("timestamp"), context=dikt.get("context"), + graph_name=dikt.get("graph_name", None), ) def is_expired(self, validity_period=86400) -> bool: diff --git a/python/graphscope/gsctl/impl/__init__.py b/python/graphscope/gsctl/impl/__init__.py index 12677c62629e..7fdd2ff4e37a 100644 --- a/python/graphscope/gsctl/impl/__init__.py +++ b/python/graphscope/gsctl/impl/__init__.py @@ -25,6 +25,7 @@ from graphscope.gsctl.impl.graph import create_graph from graphscope.gsctl.impl.graph import delete_graph_by_id from graphscope.gsctl.impl.graph import get_graph_id_by_name +from graphscope.gsctl.impl.graph import get_graph_name_by_id from graphscope.gsctl.impl.graph import list_graphs from graphscope.gsctl.impl.job import delete_job_by_id from graphscope.gsctl.impl.job import get_job_by_id diff --git a/python/graphscope/gsctl/impl/graph.py b/python/graphscope/gsctl/impl/graph.py index 06399e7968d3..269b4a56e9ca 100644 --- a/python/graphscope/gsctl/impl/graph.py +++ b/python/graphscope/gsctl/impl/graph.py @@ -70,3 +70,11 @@ def get_graph_id_by_name(name_or_id: str): f"Found multiple id candidates {id_candidate} for graph {name_or_id}, please choose one." ) return id_candidate[0] + + +def get_graph_name_by_id(graph_identifier: str): + graphs = list_graphs() + for g in graphs: + if g.id == graph_identifier: + return g.name + return graph_identifier diff --git a/python/graphscope/gsctl/impl/utils.py b/python/graphscope/gsctl/impl/utils.py index cd30821bddfe..977cbf38e755 100644 --- a/python/graphscope/gsctl/impl/utils.py +++ b/python/graphscope/gsctl/impl/utils.py @@ -30,8 +30,9 @@ def upload_file(location: str) -> str: return api_instance.upload_file(location).file_path -def switch_context(context: str): +def switch_context(context: str, graph_name=None): config = load_gs_config() current_context = get_current_context() current_context.switch_context(context) + current_context.set_graph_name(graph_name) config.update_and_write(current_context) diff --git a/python/graphscope/gsctl/utils.py b/python/graphscope/gsctl/utils.py index 167cbe20839a..f679cbc5630b 100644 --- a/python/graphscope/gsctl/utils.py +++ b/python/graphscope/gsctl/utils.py @@ -459,18 +459,19 @@ def create_datasource_mapping_node(self, graph, datasource_mapping): parent=specific_edge_mapping_identifier, ) # property mapping - for property_column_mapping in mapping.column_mappings: - tag = "Property(name: {0}) -> DataSourceColumn(index: {1}, name: {2})".format( - property_column_mapping.var_property, - property_column_mapping.column.index, - property_column_mapping.column.name, - ) - p_mapping_identifier = f"{specific_edge_mapping_identifier}_{property_column_mapping.var_property}" - self.tree.create_node( - tag=tag, - identifier=p_mapping_identifier, - parent=specific_edge_mapping_identifier, - ) + if mapping.column_mappings is not None: + for property_column_mapping in mapping.column_mappings: + tag = "Property(name: {0}) -> DataSourceColumn(index: {1}, name: {2})".format( + property_column_mapping.var_property, + property_column_mapping.column.index, + property_column_mapping.column.name, + ) + p_mapping_identifier = f"{specific_edge_mapping_identifier}_{property_column_mapping.var_property}" + self.tree.create_node( + tag=tag, + identifier=p_mapping_identifier, + parent=specific_edge_mapping_identifier, + ) def show(self, graph_identifier=None, stdout=False, sorting=False): if graph_identifier is not None: @@ -483,7 +484,5 @@ def show(self, graph_identifier=None, stdout=False, sorting=False): f"{graph_identifier}_stored_procedure" ) click.secho(stored_procedure_tree.show(stdout=False, sorting=False)) - job_tree = self.tree.subtree(f"{graph_identifier}_job") - click.secho(job_tree.show(stdout=False, sorting=False)) else: click.secho(self.tree.show(stdout=stdout, sorting=sorting))