Skip to content

Commit

Permalink
fix(interactive): prepare more graphs for groot in ci tests (#4180)
Browse files Browse the repository at this point in the history
  • Loading branch information
BingqingLyu authored Aug 22, 2024
1 parent 0de3c02 commit c2d15dd
Showing 1 changed file with 207 additions and 7 deletions.
214 changes: 207 additions & 7 deletions interactive_engine/groot-server/src/main/resources/import_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#

import os
import argparse

import pandas as pd
import graphscope as gs
Expand Down Expand Up @@ -51,18 +52,18 @@ def get_conn():

def create_modern_graph_schema(graph):
schema = graph.schema()
schema.add_vertex_label("person").add_primary_key("id", "long").add_property(
schema.add_vertex_label("person").add_primary_key("id", "int").add_property(
"name", "str"
).add_property("age", "int")
schema.add_vertex_label("software").add_primary_key("id", "long").add_property(
schema.add_vertex_label("software").add_primary_key("id", "int").add_property(
"name", "str"
).add_property("lang", "str")
schema.add_edge_label("knows").source("person").destination("person").add_property(
"edge_id", "long"
"edge_id", "int"
).add_property("weight", "double")
schema.add_edge_label("created").source("person").destination(
"software"
).add_property("edge_id", "long").add_property("weight", "double")
).add_property("edge_id", "int").add_property("weight", "double")
schema.update()


Expand All @@ -85,6 +86,45 @@ def create_crew_graph_schema(graph):
).add_property("skill", "int")
schema.update()

def create_ldbc_graph_schema(graph):
schema = graph.schema()
schema.add_vertex_label('PLACE').add_primary_key('id', 'long').add_property('name', 'str').add_property('url', 'str').add_property('type', 'str')
schema.add_vertex_label('PERSON').add_primary_key('id', 'long').add_property('firstName', 'str').add_property('lastName', 'str').add_property('gender', 'str').add_property('birthday', 'long').add_property('creationDate', 'long').add_property('locationIP', 'str').add_property('browserUsed', 'str').add_property('language', 'str').add_property('email', 'str')
schema.add_vertex_label('COMMENT').add_primary_key('id', 'long').add_property('creationDate', 'long').add_property('locationIP', 'str').add_property('browserUsed', 'str').add_property('content', 'str').add_property('length','int')
schema.add_vertex_label('POST').add_primary_key('id', 'long').add_property('imageFile', 'str').add_property('creationDate', 'long').add_property('locationIP', 'str').add_property('browserUsed', 'str').add_property('language', 'str').add_property('content', 'str').add_property('length', 'int')
schema.add_vertex_label('FORUM').add_primary_key('id', 'long').add_property('title', 'str').add_property('creationDate', 'str')
schema.add_vertex_label('ORGANISATION').add_primary_key('id', 'long').add_property('type', 'str').add_property('name', 'str').add_property('url', 'str')
schema.add_vertex_label('TAGCLASS').add_primary_key('id', 'long').add_property('name', 'str').add_property('url', 'str')
schema.add_vertex_label('TAG').add_primary_key('id', 'long').add_property('name', 'str').add_property('url', 'str')
schema.add_edge_label('HASCREATOR').source('COMMENT').destination('PERSON').source('POST').destination('PERSON')
schema.add_edge_label('HASTAG').source('COMMENT').destination('TAG').source('POST').destination('TAG').source('FORUM').destination('TAG')
schema.add_edge_label('ISLOCATEDIN').source('COMMENT').destination('PLACE').source('POST').destination('PLACE').source('PERSON').destination('PLACE').source('ORGANISATION').destination('PLACE')
schema.add_edge_label('REPLYOF').source('COMMENT').destination('COMMENT').source('COMMENT').destination('POST')
schema.add_edge_label('CONTAINEROF').source('FORUM').destination('POST')
schema.add_edge_label('HASMEMBER').source('FORUM').destination('PERSON').add_property('joinDate','long')
schema.add_edge_label('HASMODERATOR').source('FORUM').destination('PERSON')
schema.add_edge_label('HASINTEREST').source('PERSON').destination('TAG')
schema.add_edge_label('KNOWS').source('PERSON').destination('PERSON').add_property('creationDate','long')
schema.add_edge_label('LIKES').source('PERSON').destination('COMMENT').source('PERSON').destination('POST').add_property('creationDate','long')
schema.add_edge_label('STUDYAT').source('PERSON').destination('ORGANISATION').add_property('classYear','long')
schema.add_edge_label('WORKAT').source('PERSON').destination('ORGANISATION').add_property('workFrom','long')
schema.add_edge_label('ISPARTOF').source('PLACE').destination('PLACE')
schema.add_edge_label('ISSUBCLASSOF').source('TAGCLASS').destination('TAGCLASS')
schema.add_edge_label('HASTYPE').source('TAG').destination('TAGCLASS')
schema.update()

def create_movie_graph_schema(graph):
schema = graph.schema()
schema.add_vertex_label('Movie').add_primary_key('id', 'int').add_property('released', 'int').add_property('tagline', 'str').add_property('title', 'str')
schema.add_vertex_label('Person').add_primary_key('id', 'int').add_property('born', 'int').add_property('name', 'str')
schema.add_vertex_label('User').add_primary_key('id', 'int').add_property('born', 'int').add_property('name', 'str')
schema.add_edge_label('ACTED_IN').source('Person').destination('Movie')
schema.add_edge_label('DIRECTED').source('Person').destination('Movie')
schema.add_edge_label('REVIEW').source('Person').destination('Movie').add_property('rating', 'int')
schema.add_edge_label('FOLLOWS').source('User').destination('Person')
schema.add_edge_label('WROTE').source('Person').destination('Movie')
schema.add_edge_label('PRODUCED').source('Person').destination('Movie')
schema.update()

def load_data_of_modern_graph(conn, graph, prefix):
person = pd.read_csv(os.path.join(prefix, "person.csv"), sep="|")
Expand Down Expand Up @@ -203,6 +243,137 @@ def load_data_of_crew_graph(conn, graph, prefix):
assert conn.remote_flush(snapshot_id, timeout_ms=5000)
print("load crew graph done")

def batch_insert_vertices(conn, graph, vertices, batch_size=10000):
for i in range(0, len(vertices), batch_size):
batch = vertices[i:i + batch_size]
snapshot_id = graph.insert_vertices(batch)
assert conn.remote_flush(snapshot_id, timeout_ms=5000)

def batch_insert_edges(conn, graph, edges, batch_size=10000):
for i in range(0, len(edges), batch_size):
batch = edges[i:i + batch_size]
snapshot_id = graph.insert_edges(batch)
assert conn.remote_flush(snapshot_id, timeout_ms=5000)


def prepare_vertices(vertices, data, vertex_type, properties):
vertices.extend(
[
[VertexRecordKey(vertex_type, {"id": v[0]}), {prop: v[i + 1] for i, prop in enumerate(properties)}]
for v in data.itertuples(index=False)
]
)

def prepare_edges(edges, data, edge_type, source_type, destination_type, properties):
edges.extend(
[
EdgeRecordKey(edge_type,
VertexRecordKey(source_type, {"id": e[0]}),
VertexRecordKey(destination_type, {"id": e[1]})),
{
prop: e[i+2] for i, prop in enumerate(properties)
}
]
for e in data.itertuples(index=False)
)


def load_data_of_ldbc_graph(conn, graph, prefix):
place = pd.read_csv(os.path.join(prefix, "place_0_0.csv"), sep="|")
person = pd.read_csv(os.path.join(prefix, "person_0_0.csv"), sep="|")
comment = pd.read_csv(os.path.join(prefix, "comment_0_0.csv"), sep="|")
post = pd.read_csv(os.path.join(prefix, "post_0_0.csv"), sep="|")
forum = pd.read_csv(os.path.join(prefix, "forum_0_0.csv"), sep="|")
organisation = pd.read_csv(os.path.join(prefix, "organisation_0_0.csv"), sep="|")
tagclass = pd.read_csv(os.path.join(prefix, "tagclass_0_0.csv"), sep="|")
tag = pd.read_csv(os.path.join(prefix, "tag_0_0.csv"), sep="|")
comment_hascreator = pd.read_csv(os.path.join(prefix, "comment_hasCreator_person_0_0.csv"), sep="|")
post_hascreator = pd.read_csv(os.path.join(prefix, "post_hasCreator_person_0_0.csv"), sep="|")
comment_hastag = pd.read_csv(os.path.join(prefix, "comment_hasTag_tag_0_0.csv"), sep="|")
post_hastag = pd.read_csv(os.path.join(prefix, "post_hasTag_tag_0_0.csv"), sep="|")
forum_hastag = pd.read_csv(os.path.join(prefix, "forum_hasTag_tag_0_0.csv"), sep="|")
comment_islocatedin = pd.read_csv(os.path.join(prefix, "comment_isLocatedIn_place_0_0.csv"), sep="|")
post_islocatedin = pd.read_csv(os.path.join(prefix, "post_isLocatedIn_place_0_0.csv"), sep="|")
person_islocatedin = pd.read_csv(os.path.join(prefix, "person_isLocatedIn_place_0_0.csv"), sep="|")
organisation_islocatedin = pd.read_csv(os.path.join(prefix, "organisation_isLocatedIn_place_0_0.csv"), sep="|")
comment_replyof_comment = pd.read_csv(os.path.join(prefix, "comment_replyOf_comment_0_0.csv"), sep="|")
comment_replyof_post = pd.read_csv(os.path.join(prefix, "comment_replyOf_post_0_0.csv"), sep="|")
forum_containerof_post = pd.read_csv(os.path.join(prefix, "forum_containerOf_post_0_0.csv"), sep="|")
forum_hasmember_person = pd.read_csv(os.path.join(prefix, "forum_hasMember_person_0_0.csv"), sep="|")
forum_hasmoderator_person = pd.read_csv(os.path.join(prefix, "forum_hasModerator_person_0_0.csv"), sep="|")
person_hasinterest_tag = pd.read_csv(os.path.join(prefix, "person_hasInterest_tag_0_0.csv"), sep="|")
person_knows_person = pd.read_csv(os.path.join(prefix, "person_knows_person_0_0.csv"), sep="|")
person_likes_comment = pd.read_csv(os.path.join(prefix, "person_likes_comment_0_0.csv"), sep="|")
person_likes_post = pd.read_csv(os.path.join(prefix, "person_likes_post_0_0.csv"), sep="|")
person_studyat_organisation = pd.read_csv(os.path.join(prefix, "person_studyAt_organisation_0_0.csv"), sep="|")
person_workat_organisation = pd.read_csv(os.path.join(prefix, "person_workAt_organisation_0_0.csv"), sep="|")
place_ispartof_place = pd.read_csv(os.path.join(prefix, "place_isPartOf_place_0_0.csv"), sep="|")
tagclass_isSubclassOf_tagclass = pd.read_csv(os.path.join(prefix, "tagclass_isSubclassOf_tagclass_0_0.csv"), sep="|")
tag_hastype_tagclass = pd.read_csv(os.path.join(prefix, "tag_hasType_tagclass_0_0.csv"), sep="|")
vertices = []
prepare_vertices(vertices, place, "PLACE", ["name", "url", "type"])
prepare_vertices(vertices, person, "PERSON", ["firstName", "lastName", "gender", "birthday", "creationDate", "locationIP", "browserUsed", "language", "email"])
prepare_vertices(vertices, comment, "COMMENT", ["creationDate", "locationIP", "browserUsed", "content", "length"])
prepare_vertices(vertices, post, "POST", ["imageFile", "creationDate", "locationIP", "browserUsed", "language", "content", "length"])
prepare_vertices(vertices, forum, "FORUM", ["title", "creationDate"])
prepare_vertices(vertices, organisation, "ORGANISATION", ["type", "name", "url"])
prepare_vertices(vertices, tagclass, "TAGCLASS", ["name", "url"])
prepare_vertices(vertices, tag, "TAG", ["name", "url"])
edges = []
prepare_edges(edges, comment_hascreator, "HASCREATOR", "COMMENT", "PERSON", [])
prepare_edges(edges, post_hascreator, "HASCREATOR", "POST", "PERSON", [])
prepare_edges(edges, comment_hastag, "HASTAG", "COMMENT", "TAG", [])
prepare_edges(edges, post_hastag, "HASTAG", "POST", "TAG", [])
prepare_edges(edges, forum_hastag, "HASTAG", "FORUM", "TAG", [])
prepare_edges(edges, comment_islocatedin, "ISLOCATEDIN", "COMMENT", "PLACE", [])
prepare_edges(edges, post_islocatedin, "ISLOCATEDIN", "POST", "PLACE", [])
prepare_edges(edges, person_islocatedin, "ISLOCATEDIN", "PERSON", "PLACE", [])
prepare_edges(edges, organisation_islocatedin, "ISLOCATEDIN", "ORGANISATION", "PLACE", [])
prepare_edges(edges, comment_replyof_comment, "REPLYOF", "COMMENT", "COMMENT", [])
prepare_edges(edges, comment_replyof_post, "REPLYOF", "COMMENT", "POST", [])
prepare_edges(edges, forum_containerof_post, "CONTAINEROF", "FORUM", "POST", [])
prepare_edges(edges, forum_hasmember_person, "HASMEMBER", "FORUM", "PERSON", ["joinDate"])
prepare_edges(edges, forum_hasmoderator_person, "HASMODERATOR", "FORUM", "PERSON", [])
prepare_edges(edges, person_hasinterest_tag, "HASINTEREST", "PERSON", "TAG", [])
prepare_edges(edges, person_knows_person, "KNOWS", "PERSON", "PERSON", ["creationDate"])
prepare_edges(edges, person_likes_comment, "LIKES", "PERSON", "COMMENT", ["creationDate"])
prepare_edges(edges, person_likes_post, "LIKES", "PERSON", "POST", ["creationDate"])
prepare_edges(edges, person_studyat_organisation, "STUDYAT", "PERSON", "ORGANISATION", ["classYear"])
prepare_edges(edges, person_workat_organisation, "WORKAT", "PERSON", "ORGANISATION", ["workFrom"])
prepare_edges(edges, place_ispartof_place, "ISPARTOF", "PLACE", "PLACE", [])
prepare_edges(edges, tagclass_isSubclassOf_tagclass, "ISSUBCLASSOF", "TAGCLASS", "TAGCLASS", [])
prepare_edges(edges, tag_hastype_tagclass, "HASTYPE", "TAG", "TAGCLASS", [])

batch_insert_vertices(conn, graph, vertices)
batch_insert_edges(conn, graph, edges)
print("load ldbc graph done")

def load_data_of_movie_graph(conn, graph, prefix):
movie = pd.read_csv(os.path.join(prefix, "Movie.csv"), sep="|")
person = pd.read_csv(os.path.join(prefix, "Person.csv"), sep="|")
user = pd.read_csv(os.path.join(prefix, "User.csv"), sep="|")
acted_in = pd.read_csv(os.path.join(prefix, "ACTED_IN.csv"), sep="|")
directed = pd.read_csv(os.path.join(prefix, "DIRECTED.csv"), sep="|")
review = pd.read_csv(os.path.join(prefix, "REVIEWED.csv"), sep="|")
follows = pd.read_csv(os.path.join(prefix, "FOLLOWS.csv"), sep="|")
wrote = pd.read_csv(os.path.join(prefix, "WROTE.csv"), sep="|")
produced = pd.read_csv(os.path.join(prefix, "PRODUCED.csv"), sep="|")
vertices = []
prepare_vertices(vertices, movie, "Movie", ["released", "tagline", "title"])
prepare_vertices(vertices, person, "Person", ["born", "name"])
prepare_vertices(vertices, user, "User", ["born", "name"])
edges = []
prepare_edges(edges, acted_in, "ACTED_IN", "Person", "Movie", [])
prepare_edges(edges, directed, "DIRECTED", "Person", "Movie", [])
prepare_edges(edges, review, "REVIEW", "Person", "Movie", ["rating"])
prepare_edges(edges, follows, "FOLLOWS", "User", "Person", [])
prepare_edges(edges, wrote, "WROTE", "Person", "Movie", [])
prepare_edges(edges, produced, "PRODUCED", "Person", "Movie", [])
snapshot_id = graph.insert_vertices(vertices)
snapshot_id = graph.insert_edges(edges)
assert conn.remote_flush(snapshot_id, timeout_ms=5000)
print("load movie graph done")


def create_modern_graph(conn, graph, client):
create_modern_graph_schema(graph)
Expand All @@ -215,10 +386,39 @@ def create_crew_graph(conn, graph, client):
load_data_of_crew_graph(conn, graph, "/home/graphscope/crew")
statistics(client)

def create_ldbc_graph(conn, graph, client):
create_ldbc_graph_schema(graph)
load_data_of_ldbc_graph(conn, graph, "/home/graphscope/ldbc")
statistics(client)

if __name__ == "__main__":
def create_movie_graph(conn, graph, client):
create_movie_graph_schema(graph)
load_data_of_movie_graph(conn, graph, "/home/graphscope/movies")
statistics(client)

def main():
client = get_client()
conn = get_conn()
graph = conn.g()
create_modern_graph(conn, graph, client)
# create_crew_graph(conn, graph, client)

parser = argparse.ArgumentParser(description="Import specific graph data.")
parser.add_argument(
'--graph',
choices=['modern', 'crew', 'ldbc', 'movie'],
required=True,
help="The graph to import: 'modern', 'crew', 'ldbc', or 'movie'."
)

args = parser.parse_args()

if args.graph == 'modern':
create_modern_graph(conn, graph, client)
elif args.graph == 'crew':
create_crew_graph(conn, graph, client)
elif args.graph == 'ldbc':
create_ldbc_graph(conn, graph, client)
elif args.graph == 'movie':
create_movie_graph(conn, graph, client)

if __name__ == "__main__":
main()

0 comments on commit c2d15dd

Please sign in to comment.