Skip to content

Commit

Permalink
chore: Add comments to graphiti methods
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-paliychuk committed Aug 26, 2024
1 parent 0ed7739 commit 60b53b9
Showing 1 changed file with 222 additions and 2 deletions.
224 changes: 222 additions & 2 deletions graphiti_core/graphiti.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,42 @@

class Graphiti:
def __init__(self, uri: str, user: str, password: str, llm_client: LLMClient | None = None):
"""
Initialize a Graphiti instance.
This constructor sets up a connection to the Neo4j database and initializes
the LLM client for natural language processing tasks.
Parameters
----------
uri : str
The URI of the Neo4j database.
user : str
The username for authenticating with the Neo4j database.
password : str
The password for authenticating with the Neo4j database.
llm_client : LLMClient | None, optional
An instance of LLMClient for natural language processing tasks.
If not provided, a default OpenAIClient will be initialized.
Returns
-------
None
Notes
-----
This method establishes a connection to the Neo4j database using the provided
credentials. It also sets up the LLM client, either using the provided client
or by creating a default OpenAIClient.
The default database name is set to 'neo4j'. If a different database name
is required, it should be specified in the URI or set separately after
initialization.
The OpenAI API key is expected to be set in the environment variables.
Make sure to set the OPENAI_API_KEY environment variable before initializing
Graphiti if you're using the default OpenAIClient.
"""
self.driver = AsyncGraphDatabase.driver(uri, auth=(user, password))
self.database = 'neo4j'
if llm_client:
Expand All @@ -79,17 +115,97 @@ def __init__(self, uri: str, user: str, password: str, llm_client: LLMClient | N
)

def close(self):
"""
Close the connection to the Neo4j database.
This method safely closes the driver connection to the Neo4j database.
It should be called when the Graphiti instance is no longer needed or
when the application is shutting down.
Parameters
----------
None
Returns
-------
None
Notes
-----
It's important to close the driver connection to release system resources
and ensure that all pending transactions are completed or rolled back.
This method should be called as part of a cleanup process, potentially
in a context manager or a shutdown hook.
Example:
graphiti = Graphiti(uri, user, password)
try:
# Use graphiti...
finally:
graphiti.close()
self.driver.close()
"""

async def build_indices_and_constraints(self):
"""
Build indices and constraints in the Neo4j database.
This method sets up the necessary indices and constraints in the Neo4j database
to optimize query performance and ensure data integrity for the knowledge graph.
Parameters
----------
None
Returns
-------
None
Notes
-----
This method should typically be called once during the initial setup of the
knowledge graph or when updating the database schema. It uses the
`build_indices_and_constraints` function from the
`graphiti_core.utils.maintenance.graph_data_operations` module to perform
the actual database operations.
The specific indices and constraints created depend on the implementation
of the `build_indices_and_constraints` function. Refer to that function's
documentation for details on the exact database schema modifications.
Caution: Running this method on a large existing database may take some time
and could impact database performance during execution.
"""
await build_indices_and_constraints(self.driver)

async def retrieve_episodes(
self,
reference_time: datetime,
last_n: int = EPISODE_WINDOW_LEN,
) -> list[EpisodicNode]:
"""Retrieve the last n episodic nodes from the graph"""
"""
Retrieve the last n episodic nodes from the graph.
This method fetches a specified number of the most recent episodic nodes
from the graph, relative to the given reference time.
Parameters
----------
reference_time : datetime
The reference time to retrieve episodes before.
last_n : int, optional
The number of episodes to retrieve. Defaults to EPISODE_WINDOW_LEN.
Returns
-------
list[EpisodicNode]
A list of the most recent EpisodicNode objects.
Notes
-----
The actual retrieval is performed by the `retrieve_episodes` function
from the `graphiti_core.utils` module.
"""
return await retrieve_episodes(self.driver, reference_time, last_n)

async def add_episode(
Expand All @@ -102,7 +218,50 @@ async def add_episode(
success_callback: Callable | None = None,
error_callback: Callable | None = None,
):
"""Process an episode and update the graph"""
"""
Process an episode and update the graph.
This method extracts information from the episode, creates nodes and edges,
and updates the graph database accordingly.
Parameters
----------
name : str
The name of the episode.
episode_body : str
The content of the episode.
source_description : str
A description of the episode's source.
reference_time : datetime
The reference time for the episode.
source : EpisodeType, optional
The type of the episode. Defaults to EpisodeType.message.
success_callback : Callable | None, optional
A callback function to be called upon successful processing.
error_callback : Callable | None, optional
A callback function to be called if an error occurs during processing.
Returns
-------
None
Notes
-----
This method performs several steps including node extraction, edge extraction,
deduplication, and database updates. It also handles embedding generation
and edge invalidation.
It is recommended to run this method as a background process, such as in a queue.
It's important that each episode is added sequentially and awaited before adding
the next one. For web applications, consider using FastAPI's background tasks
or a dedicated task queue like Celery for this purpose.
Example using FastAPI background tasks:
@app.post("/add_episode")
async def add_episode_endpoint(episode_data: EpisodeData):
background_tasks.add_task(graphiti.add_episode, **episode_data.dict())
return {"message": "Episode processing started"}
"""
try:
start = time()

Expand Down Expand Up @@ -255,6 +414,40 @@ async def add_episode_bulk(
self,
bulk_episodes: list[RawEpisode],
):
"""
Process multiple episodes in bulk and update the graph.
This method extracts information from multiple episodes, creates nodes and edges,
and updates the graph database accordingly, all in a single batch operation.
Parameters
----------
bulk_episodes : list[RawEpisode]
A list of RawEpisode objects to be processed and added to the graph.
Returns
-------
None
Notes
-----
This method performs several steps including:
- Saving all episodes to the database
- Retrieving previous episode context for each new episode
- Extracting nodes and edges from all episodes
- Generating embeddings for nodes and edges
- Deduplicating nodes and edges
- Saving nodes, episodic edges, and entity edges to the knowledge graph
This bulk operation is designed for efficiency when processing multiple episodes
at once. However, it's important to ensure that the bulk operation doesn't
overwhelm system resources. Consider implementing rate limiting or chunking for
very large batches of episodes.
Important: This method does not perform edge invalidation or date extraction steps.
If these operations are required, use the `add_episode` method instead for each
individual episode.
"""
try:
start = time()
embedder = self.llm_client.get_embedder()
Expand Down Expand Up @@ -329,6 +522,33 @@ async def add_episode_bulk(
raise e

async def search(self, query: str, num_results=10):
"""
Perform a hybrid search on the knowledge graph.
This method executes a search query on the graph, combining vector and
text-based search techniques to retrieve relevant facts.
Parameters
----------
query : str
The search query string.
num_results : int, optional
The maximum number of results to return. Defaults to 10.
Returns
-------
list
A list of facts (strings) that are relevant to the search query.
Notes
-----
This method uses a SearchConfig with num_episodes set to 0 and
num_results set to the provided num_results parameter. It then calls
the hybrid_search function to perform the actual search operation.
The search is performed using the current date and time as the reference
point for temporal relevance.
"""
search_config = SearchConfig(num_episodes=0, num_results=num_results)
edges = (
await hybrid_search(
Expand Down

0 comments on commit 60b53b9

Please sign in to comment.