Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes and Enhancements for PostgreSQL and JSON Document Storage #837

Merged
merged 16 commits into from
Feb 18, 2025
2 changes: 1 addition & 1 deletion lightrag/api/docs/LightRagWithPostGRESQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Replace placeholders like `your_role_name`, `your_password`, and `your_database`
Start the LightRAG server using specified options:

```bash
lightrag-server --port 9626 --key sk-SL1 --kv-storage PGKVStorage --graph-storage PGGraphStorage --vector-storage PGVectorStorage --doc-status-storage PGDocStatusStorage
lightrag-server --port 9621 --key sk-somepassword --kv-storage PGKVStorage --graph-storage PGGraphStorage --vector-storage PGVectorStorage --doc-status-storage PGDocStatusStorage
```

Replace `the-port-number` with your desired port number (default is 9621) and `your-secret-key` with a secure key.
Expand Down
4 changes: 4 additions & 0 deletions lightrag/kg/json_doc_status_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ async def delete(self, doc_ids: list[str]):
for doc_id in doc_ids:
self._data.pop(doc_id, None)
await self.index_done_callback()

async def drop(self) -> None:
"""Drop the storage"""
self._data.clear()
77 changes: 64 additions & 13 deletions lightrag/kg/postgres_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ async def filter_keys(self, keys: set[str]) -> set[str]:
exist_keys = [key["id"] for key in res]
else:
exist_keys = []
data = set([s for s in keys if s not in exist_keys])
return data
new_keys = set([s for s in keys if s not in exist_keys])
return new_keys
except Exception as e:
logger.error(f"PostgreSQL database error: {e}")
print(sql)
Expand Down Expand Up @@ -301,6 +301,11 @@ async def index_done_callback(self) -> None:
# PG handles persistence automatically
pass

async def drop(self) -> None:
"""Drop the storage"""
drop_sql = SQL_TEMPLATES["drop_all"]
await self.db.execute(drop_sql)


@final
@dataclass
Expand Down Expand Up @@ -432,16 +437,26 @@ async def delete_entity_relation(self, entity_name: str) -> None:
@dataclass
class PGDocStatusStorage(DocStatusStorage):
async def filter_keys(self, keys: set[str]) -> set[str]:
"""Return keys that don't exist in storage"""
keys = ",".join([f"'{_id}'" for _id in keys])
sql = f"SELECT id FROM LIGHTRAG_DOC_STATUS WHERE workspace='{self.db.workspace}' AND id IN ({keys})"
result = await self.db.query(sql, multirows=True)
# The result is like [{'id': 'id1'}, {'id': 'id2'}, ...].
if result is None:
return set(keys)
else:
existed = set([element["id"] for element in result])
return set(keys) - existed
"""Filter out duplicated content"""
sql = SQL_TEMPLATES["filter_keys"].format(
table_name=namespace_to_table_name(self.namespace),
ids=",".join([f"'{id}'" for id in keys]),
)
params = {"workspace": self.db.workspace}
try:
res = await self.db.query(sql, params, multirows=True)
if res:
exist_keys = [key["id"] for key in res]
else:
exist_keys = []
new_keys = set([s for s in keys if s not in exist_keys])
print(f"keys: {keys}")
Copy link
Contributor

@YanSte YanSte Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please remove all the print. 🙏🏻

print(f"new_keys: {new_keys}")
return new_keys
except Exception as e:
logger.error(f"PostgreSQL database error: {e}")
print(sql)
print(params)

async def get_by_id(self, id: str) -> Union[dict[str, Any], None]:
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and id=$2"
Expand Down Expand Up @@ -483,7 +498,7 @@ async def get_docs_by_status(
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
params = {"workspace": self.db.workspace, "status": status.value}
result = await self.db.query(sql, params, True)
return {
docs_by_status = {
element["id"]: DocProcessingStatus(
content=result[0]["content"],
content_summary=element["content_summary"],
Expand All @@ -495,6 +510,7 @@ async def get_docs_by_status(
)
for element in result
}
return docs_by_status

async def index_done_callback(self) -> None:
# PG handles persistence automatically
Expand Down Expand Up @@ -531,6 +547,11 @@ async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
)
return data

async def drop(self) -> None:
"""Drop the storage"""
drop_sql = SQL_TEMPLATES["drop_doc_full"]
await self.db.execute(drop_sql)


class PGGraphQueryException(Exception):
"""Exception for the AGE queries."""
Expand Down Expand Up @@ -1012,6 +1033,13 @@ async def get_knowledge_graph(
) -> KnowledgeGraph:
raise NotImplementedError

async def drop(self) -> None:
"""Drop the storage"""
drop_sql = SQL_TEMPLATES["drop_vdb_entity"]
await self.db.execute(drop_sql)
drop_sql = SQL_TEMPLATES["drop_vdb_relation"]
await self.db.execute(drop_sql)


NAMESPACE_TABLE_MAP = {
NameSpace.KV_STORE_FULL_DOCS: "LIGHTRAG_DOC_FULL",
Expand Down Expand Up @@ -1194,4 +1222,27 @@ def namespace_to_table_name(namespace: str) -> str:
FROM LIGHTRAG_DOC_CHUNKS where workspace=$1)
WHERE distance>$2 ORDER BY distance DESC LIMIT $3
""",
# DROP tables
"drop_all": """
DROP TABLE IF EXISTS LIGHTRAG_DOC_FULL CASCADE;
DROP TABLE IF EXISTS LIGHTRAG_DOC_CHUNKS CASCADE;
DROP TABLE IF EXISTS LIGHTRAG_LLM_CACHE CASCADE;
DROP TABLE IF EXISTS LIGHTRAG_VDB_ENTITY CASCADE;
DROP TABLE IF EXISTS LIGHTRAG_VDB_RELATION CASCADE;
""",
"drop_doc_full": """
DROP TABLE IF EXISTS LIGHTRAG_DOC_FULL CASCADE;
""",
"drop_doc_chunks": """
DROP TABLE IF EXISTS LIGHTRAG_DOC_CHUNKS CASCADE;
""",
"drop_llm_cache": """
DROP TABLE IF EXISTS LIGHTRAG_LLM_CACHE CASCADE;
""",
"drop_vdb_entity": """
DROP TABLE IF EXISTS LIGHTRAG_VDB_ENTITY CASCADE;
""",
"drop_vdb_relation": """
DROP TABLE IF EXISTS LIGHTRAG_VDB_RELATION CASCADE;
""",
}