Skip to content
This repository has been archived by the owner on Jan 2, 2024. It is now read-only.

Commit

Permalink
Merge pull request #824 from Avaiga/feature/#755-append-to-datanodes
Browse files Browse the repository at this point in the history
Feature/#755 - Open append() method on datanodes
  • Loading branch information
trgiangdo authored Nov 15, 2023
2 parents 5996a91 + 2682eb4 commit 405568c
Show file tree
Hide file tree
Showing 25 changed files with 699 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def _check_callable(self, data_node_config_id: str, data_node_config: DataNodeCo
],
DataNodeConfig._STORAGE_TYPE_VALUE_SQL: [
DataNodeConfig._REQUIRED_WRITE_QUERY_BUILDER_SQL_PROPERTY,
DataNodeConfig._OPTIONAL_APPEND_QUERY_BUILDER_SQL_PROPERTY,
],
}

Expand Down
5 changes: 5 additions & 0 deletions src/taipy/core/config/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@
"type": "string",
"taipy_function": true
},
"append_query_builder": {
"description": "storage_type: sql specific. A callable function that takes in the data as an input parameter and returns a list of SQL queries to be executed when the append data node method is called.",
"type": "string",
"taipy_function": true
},
"collection_name ": {
"description": "storage_type: mongo_collection specific.",
"type": "string"
Expand Down
9 changes: 8 additions & 1 deletion src/taipy/core/config/data_node_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class DataNodeConfig(Section):
# SQL
_REQUIRED_READ_QUERY_SQL_PROPERTY = "read_query"
_REQUIRED_WRITE_QUERY_BUILDER_SQL_PROPERTY = "write_query_builder"
_OPTIONAL_APPEND_QUERY_BUILDER_SQL_PROPERTY = "append_query_builder"
# MONGO
_REQUIRED_DB_NAME_MONGO_PROPERTY = "db_name"
_REQUIRED_COLLECTION_NAME_MONGO_PROPERTY = "collection_name"
Expand Down Expand Up @@ -207,6 +208,7 @@ class DataNodeConfig(Section):
_OPTIONAL_HOST_SQL_PROPERTY: "localhost",
_OPTIONAL_PORT_SQL_PROPERTY: 1433,
_OPTIONAL_DRIVER_SQL_PROPERTY: "",
_OPTIONAL_APPEND_QUERY_BUILDER_SQL_PROPERTY: None,
_OPTIONAL_FOLDER_PATH_SQLITE_PROPERTY: None,
_OPTIONAL_FILE_EXTENSION_SQLITE_PROPERTY: ".db",
_OPTIONAL_DB_EXTRA_ARGS_SQL_PROPERTY: None,
Expand Down Expand Up @@ -867,6 +869,7 @@ def _configure_sql(
db_engine: str,
read_query: str,
write_query_builder: Callable,
append_query_builder: Optional[Callable] = None,
db_username: Optional[str] = None,
db_password: Optional[str] = None,
db_host: Optional[str] = None,
Expand All @@ -889,7 +892,9 @@ def _configure_sql(
or *"postgresql"*.
read_query (str): The SQL query string used to read the data from the database.
write_query_builder (Callable): A callback function that takes the data as an input parameter
and returns a list of SQL queries.
and returns a list of SQL queries to be executed when writing data to the data node.
append_query_builder (Optional[Callable]): A callback function that takes the data as an input parameter
and returns a list of SQL queries to be executed when appending data to the data node.
db_username (Optional[str]): The database username. Required by the *"mssql"*, *"mysql"*, and
*"postgresql"* engines.
db_password (Optional[str]): The database password. Required by the *"mssql"*, *"mysql"*, and
Expand Down Expand Up @@ -927,6 +932,8 @@ def _configure_sql(
}
)

if append_query_builder is not None:
properties[cls._OPTIONAL_APPEND_QUERY_BUILDER_SQL_PROPERTY] = append_query_builder
if db_username is not None:
properties[cls._OPTIONAL_DB_USERNAME_SQL_PROPERTY] = db_username
if db_password is not None:
Expand Down
16 changes: 16 additions & 0 deletions src/taipy/core/data/_abstract_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,22 @@ def _get_read_query(self, operators: Optional[Union[List, Tuple]] = None, join_o
def _get_base_read_query(self) -> str:
raise NotImplementedError

def _append(self, data) -> None:
engine = self._get_engine()
with engine.connect() as connection:
with connection.begin() as transaction:
try:
self._do_append(data, engine, connection)
except Exception as e:
transaction.rollback()
raise e
else:
transaction.commit()

@abstractmethod
def _do_append(self, data, engine, connection) -> None:
raise NotImplementedError

def _write(self, data) -> None:
"""Check data against a collection of types to handle insertion on the database."""
engine = self._get_engine()
Expand Down
25 changes: 21 additions & 4 deletions src/taipy/core/data/_data_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class _DataNodeConverter(_AbstractConverter):
_EXPOSED_TYPE_KEY = "exposed_type"
__WRITE_QUERY_BUILDER_NAME_KEY = "write_query_builder_name"
__WRITE_QUERY_BUILDER_MODULE_KEY = "write_query_builder_module"
__APPEND_QUERY_BUILDER_NAME_KEY = "append_query_builder_name"
__APPEND_QUERY_BUILDER_MODULE_KEY = "append_query_builder_module"
# TODO: This limits the valid string to only the ones provided by the Converter.
# While in practice, each data nodes might have different exposed type possibilities.
# The previous implementation used tabular datanode but it's no longer suitable so
Expand Down Expand Up @@ -71,11 +73,16 @@ def __serialize_json_dn_properties(cls, datanode_properties: dict):

@classmethod
def __serialize_sql_dn_properties(cls, datanode_properties: dict) -> dict:
query_builder = datanode_properties.get(SQLDataNode._WRITE_QUERY_BUILDER_KEY)
datanode_properties[cls.__WRITE_QUERY_BUILDER_NAME_KEY] = query_builder.__name__ if query_builder else None
datanode_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY] = query_builder.__module__ if query_builder else None
write_qb = datanode_properties.get(SQLDataNode._WRITE_QUERY_BUILDER_KEY)
datanode_properties[cls.__WRITE_QUERY_BUILDER_NAME_KEY] = write_qb.__name__ if write_qb else None
datanode_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY] = write_qb.__module__ if write_qb else None
datanode_properties.pop(SQLDataNode._WRITE_QUERY_BUILDER_KEY, None)

append_qb = datanode_properties.get(SQLDataNode._APPEND_QUERY_BUILDER_KEY)
datanode_properties[cls.__APPEND_QUERY_BUILDER_NAME_KEY] = append_qb.__name__ if append_qb else None
datanode_properties[cls.__APPEND_QUERY_BUILDER_MODULE_KEY] = append_qb.__module__ if append_qb else None
datanode_properties.pop(SQLDataNode._APPEND_QUERY_BUILDER_KEY, None)

return datanode_properties

@classmethod
Expand Down Expand Up @@ -209,7 +216,6 @@ def __deserialize_json_dn_properties(cls, datanode_model_properties: dict) -> di

@classmethod
def __deserialize_sql_dn_model_properties(cls, datanode_model_properties: dict) -> dict:

if datanode_model_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY]:
datanode_model_properties[SQLDataNode._WRITE_QUERY_BUILDER_KEY] = _load_fct(
datanode_model_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY],
Expand All @@ -221,6 +227,17 @@ def __deserialize_sql_dn_model_properties(cls, datanode_model_properties: dict)
del datanode_model_properties[cls.__WRITE_QUERY_BUILDER_NAME_KEY]
del datanode_model_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY]

if datanode_model_properties[cls.__APPEND_QUERY_BUILDER_MODULE_KEY]:
datanode_model_properties[SQLDataNode._APPEND_QUERY_BUILDER_KEY] = _load_fct(
datanode_model_properties[cls.__APPEND_QUERY_BUILDER_MODULE_KEY],
datanode_model_properties[cls.__APPEND_QUERY_BUILDER_NAME_KEY],
)
else:
datanode_model_properties[SQLDataNode._APPEND_QUERY_BUILDER_KEY] = None

del datanode_model_properties[cls.__APPEND_QUERY_BUILDER_NAME_KEY]
del datanode_model_properties[cls.__APPEND_QUERY_BUILDER_MODULE_KEY]

return datanode_model_properties

@classmethod
Expand Down
8 changes: 8 additions & 0 deletions src/taipy/core/data/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ def _read_as_modin_dataframe(
except pd.errors.EmptyDataError:
return modin_pd.DataFrame()

def _append(self, data: Any):
if isinstance(data, (pd.DataFrame, modin_pd.DataFrame)):
data.to_csv(self._path, mode="a", index=False, encoding=self.properties[self.__ENCODING_KEY], header=False)
else:
pd.DataFrame(data).to_csv(
self._path, mode="a", index=False, encoding=self.properties[self.__ENCODING_KEY], header=False
)

def _write(self, data: Any):
if isinstance(data, (pd.DataFrame, modin_pd.DataFrame)):
data.to_csv(self._path, index=False, encoding=self.properties[self.__ENCODING_KEY])
Expand Down
20 changes: 20 additions & 0 deletions src/taipy/core/data/data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,22 @@ def read(self) -> Any:
)
return None

def append(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
"""Append some data to this data node.
Parameters:
data (Any): The data to write to this data node.
job_id (JobId^): An optional identifier of the writer.
**kwargs (dict[str, any]): Extra information to attach to the edit document
corresponding to this write.
"""
from ._data_manager_factory import _DataManagerFactory

self._append(data)
self.track_edit(job_id=job_id, **kwargs)
self.unlock_edit()
_DataManagerFactory._build_manager()._set(self)

def write(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
"""Write some data to this data node.
Expand Down Expand Up @@ -438,6 +454,10 @@ def __getitem__(self, item):
def _read(self):
raise NotImplementedError

@abstractmethod
def _append(self, data):
raise NotImplementedError

@abstractmethod
def _write(self, data):
raise NotImplementedError
Expand Down
40 changes: 40 additions & 0 deletions src/taipy/core/data/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,46 @@ def _read_as_modin_dataframe(
except pd.errors.EmptyDataError:
return modin_pd.DataFrame()

def __append_excel_with_single_sheet(self, append_excel_fct, *args, **kwargs):
sheet_name = self.properties.get(self.__SHEET_NAME_PROPERTY)

with pd.ExcelWriter(self._path, mode="a", engine="openpyxl", if_sheet_exists="overlay") as writer:
if sheet_name:
if not isinstance(sheet_name, str):
sheet_name = sheet_name[0]
append_excel_fct(
writer, *args, **kwargs, sheet_name=sheet_name, startrow=writer.sheets[sheet_name].max_row
)
else:
sheet_name = list(writer.sheets.keys())[0]
append_excel_fct(writer, *args, **kwargs, startrow=writer.sheets[sheet_name].max_row)

def __append_excel_with_multiple_sheets(self, data: Any, columns: List[str] = None):
with pd.ExcelWriter(self._path, mode="a", engine="openpyxl", if_sheet_exists="overlay") as writer:
# Each key stands for a sheet name
for sheet_name in data.keys():
if isinstance(data[sheet_name], np.ndarray):
df = pd.DataFrame(data[sheet_name])
else:
df = data[sheet_name]

if columns:
data[sheet_name].columns = columns

df.to_excel(
writer, sheet_name=sheet_name, index=False, header=False, startrow=writer.sheets[sheet_name].max_row
)

def _append(self, data: Any):
if isinstance(data, Dict) and all(
[isinstance(x, (pd.DataFrame, modin_pd.DataFrame, np.ndarray)) for x in data.values()]
):
self.__append_excel_with_multiple_sheets(data)
elif isinstance(data, (pd.DataFrame, modin_pd.DataFrame)):
self.__append_excel_with_single_sheet(data.to_excel, index=False, header=False)
else:
self.__append_excel_with_single_sheet(pd.DataFrame(data).to_excel, index=False, header=False)

def __write_excel_with_single_sheet(self, write_excel_fct, *args, **kwargs):
sheet_name = self.properties.get(self.__SHEET_NAME_PROPERTY)
if sheet_name:
Expand Down
14 changes: 14 additions & 0 deletions src/taipy/core/data/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,20 @@ def _read(self):
with open(self._path, "r", encoding=self.properties[self.__ENCODING_KEY]) as f:
return json.load(f, cls=self._decoder)

def _append(self, data: Any):
with open(self._path, "r+", encoding=self.properties[self.__ENCODING_KEY]) as f:
file_data = json.load(f, cls=self._decoder)
if isinstance(file_data, List):
if isinstance(data, List):
file_data.extend(data)
else:
file_data.append(data)
elif isinstance(data, Dict):
file_data.update(data)

f.seek(0)
json.dump(file_data, f, indent=4, cls=self._encoder)

def _write(self, data: Any):
with open(self._path, "w", encoding=self.properties[self.__ENCODING_KEY]) as f: # type: ignore
json.dump(data, f, indent=4, cls=self._encoder)
Expand Down
38 changes: 29 additions & 9 deletions src/taipy/core/data/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,35 +217,55 @@ def _read_by_query(self, operators: Optional[Union[List, Tuple]] = None, join_op

return self.collection.find(query)

def _write(self, data) -> None:
"""Check data against a collection of types to handle insertion on the database."""
def _append(self, data) -> None:
"""Append data to a Mongo collection."""
if not isinstance(data, list):
data = [data]

if len(data) == 0:
self.collection.drop()
return

if isinstance(data[0], dict):
self._insert_dicts(data)
else:
self._insert_dicts([self._encoder(row) for row in data])

def _insert_dicts(self, data: List[Dict]) -> None:
def _write(self, data) -> None:
"""Check data against a collection of types to handle insertion on the database.
Parameters:
data (Any): the data to write to the database.
"""
:param data: a list of dictionaries
if not isinstance(data, list):
data = [data]

if len(data) == 0:
self.collection.drop()
return

This method will overwrite the data contained in a list of dictionaries into a collection.
if isinstance(data[0], dict):
self._insert_dicts(data, drop=True)
else:
self._insert_dicts([self._encoder(row) for row in data], drop=True)

def _insert_dicts(self, data: List[Dict], drop=False) -> None:
"""
This method will insert data contained in a list of dictionaries into a collection.
Parameters:
data (List[Dict]): a list of dictionaries
drop (bool): drop the collection before inserting the data to overwrite the data in the collection.
"""
self.collection.drop()
if drop:
self.collection.drop()

self.collection.insert_many(data)

def _default_decoder(self, document: Dict) -> Any:
"""Decode a Mongo dictionary to a custom document object for reading.
Args:
Parameters:
document (Dict): the document dictionary return by Mongo query.
Returns:
A custom document object.
"""
Expand Down
3 changes: 3 additions & 0 deletions src/taipy/core/data/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ def _read_as_pandas_dataframe(self, read_kwargs: Dict) -> pd.DataFrame:
def _read_as_modin_dataframe(self, read_kwargs: Dict) -> modin_pd.DataFrame:
return modin_pd.read_parquet(self._path, **read_kwargs)

def _append(self, data: Any):
self.write_with_kwargs(data, engine="fastparquet", append=True)

def _write(self, data: Any):
self.write_with_kwargs(data)

Expand Down
20 changes: 17 additions & 3 deletions src/taipy/core/data/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from taipy.config.common.scope import Scope

from .._version._version_manager_factory import _VersionManagerFactory
from ..exceptions.exceptions import MissingRequiredProperty
from ..exceptions.exceptions import MissingAppendQueryBuilder, MissingRequiredProperty
from ._abstract_sql import _AbstractSQLDataNode
from .data_node_id import DataNodeId, Edit

Expand Down Expand Up @@ -55,7 +55,9 @@ class SQLDataNode(_AbstractSQLDataNode):
_"postgresql"_.
- _"read_query"_ `(str)`: The SQL query string used to read the data from the database.
- _"write_query_builder"_ `(Callable)`: A callback function that takes the data as an input parameter and
returns a list of SQL queries.
returns a list of SQL queries to be executed when writing data to the data node.
- _"append_query_builder"_ `(Callable)`: A callback function that takes the data as an input parameter and
returns a list of SQL queries to be executed when appending data to the data node.
- _"db_username"_ `(str)`: The database username.
- _"db_password"_ `(str)`: The database password.
- _"db_host"_ `(str)`: The database host. The default value is _"localhost"_.
Expand All @@ -72,6 +74,7 @@ class SQLDataNode(_AbstractSQLDataNode):
__STORAGE_TYPE = "sql"
__READ_QUERY_KEY = "read_query"
_WRITE_QUERY_BUILDER_KEY = "write_query_builder"
_APPEND_QUERY_BUILDER_KEY = "append_query_builder"

def __init__(
self,
Expand Down Expand Up @@ -116,6 +119,7 @@ def __init__(
{
self.__READ_QUERY_KEY,
self._WRITE_QUERY_BUILDER_KEY,
self._APPEND_QUERY_BUILDER_KEY,
}
)

Expand All @@ -126,9 +130,19 @@ def storage_type(cls) -> str:
def _get_base_read_query(self) -> str:
return self.properties.get(self.__READ_QUERY_KEY)

def _do_append(self, data, engine, connection) -> None:
if not self.properties.get(self._APPEND_QUERY_BUILDER_KEY):
raise MissingAppendQueryBuilder

queries = self.properties.get(self._APPEND_QUERY_BUILDER_KEY)(data)
self.__execute_queries(queries, connection)

def _do_write(self, data, engine, connection) -> None:
queries = self.properties.get(self._WRITE_QUERY_BUILDER_KEY)(data)
if not isinstance(queries, list):
self.__execute_queries(queries, connection)

def __execute_queries(self, queries, connection) -> None:
if not isinstance(queries, List):
queries = [queries]
for query in queries:
if isinstance(query, str):
Expand Down
Loading

0 comments on commit 405568c

Please sign in to comment.