Skip to content
This repository has been archived by the owner on Jan 2, 2024. It is now read-only.

Feature/#755 - Open append() method on datanodes #824

Merged
merged 6 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def _check_callable(self, data_node_config_id: str, data_node_config: DataNodeCo
],
DataNodeConfig._STORAGE_TYPE_VALUE_SQL: [
DataNodeConfig._REQUIRED_WRITE_QUERY_BUILDER_SQL_PROPERTY,
DataNodeConfig._OPTIONAL_APPEND_QUERY_BUILDER_SQL_PROPERTY,
],
}

Expand Down
5 changes: 5 additions & 0 deletions src/taipy/core/config/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@
"type": "string",
"taipy_function": true
},
"append_query_builder": {
"description": "storage_type: sql specific. A callable function that takes in the data as an input parameter and returns a list of SQL queries to be executed when the append data node method is called.",
"type": "string",
"taipy_function": true
},
"collection_name ": {
"description": "storage_type: mongo_collection specific.",
"type": "string"
Expand Down
9 changes: 8 additions & 1 deletion src/taipy/core/config/data_node_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class DataNodeConfig(Section):
# SQL
_REQUIRED_READ_QUERY_SQL_PROPERTY = "read_query"
_REQUIRED_WRITE_QUERY_BUILDER_SQL_PROPERTY = "write_query_builder"
_OPTIONAL_APPEND_QUERY_BUILDER_SQL_PROPERTY = "append_query_builder"
trgiangdo marked this conversation as resolved.
Show resolved Hide resolved
# MONGO
_REQUIRED_DB_NAME_MONGO_PROPERTY = "db_name"
_REQUIRED_COLLECTION_NAME_MONGO_PROPERTY = "collection_name"
Expand Down Expand Up @@ -207,6 +208,7 @@ class DataNodeConfig(Section):
_OPTIONAL_HOST_SQL_PROPERTY: "localhost",
_OPTIONAL_PORT_SQL_PROPERTY: 1433,
_OPTIONAL_DRIVER_SQL_PROPERTY: "",
_OPTIONAL_APPEND_QUERY_BUILDER_SQL_PROPERTY: None,
_OPTIONAL_FOLDER_PATH_SQLITE_PROPERTY: None,
_OPTIONAL_FILE_EXTENSION_SQLITE_PROPERTY: ".db",
_OPTIONAL_DB_EXTRA_ARGS_SQL_PROPERTY: None,
Expand Down Expand Up @@ -867,6 +869,7 @@ def _configure_sql(
db_engine: str,
read_query: str,
write_query_builder: Callable,
append_query_builder: Optional[Callable] = None,
db_username: Optional[str] = None,
db_password: Optional[str] = None,
db_host: Optional[str] = None,
Expand All @@ -889,7 +892,9 @@ def _configure_sql(
or *"postgresql"*.
read_query (str): The SQL query string used to read the data from the database.
write_query_builder (Callable): A callback function that takes the data as an input parameter
and returns a list of SQL queries.
and returns a list of SQL queries to be executed when writing data to the data node.
append_query_builder (Optional[Callable]): A callback function that takes the data as an input parameter
and returns a list of SQL queries to be executed when appending data to the data node.
db_username (Optional[str]): The database username. Required by the *"mssql"*, *"mysql"*, and
*"postgresql"* engines.
db_password (Optional[str]): The database password. Required by the *"mssql"*, *"mysql"*, and
Expand Down Expand Up @@ -927,6 +932,8 @@ def _configure_sql(
}
)

if append_query_builder is not None:
properties[cls._OPTIONAL_APPEND_QUERY_BUILDER_SQL_PROPERTY] = append_query_builder
if db_username is not None:
properties[cls._OPTIONAL_DB_USERNAME_SQL_PROPERTY] = db_username
if db_password is not None:
Expand Down
16 changes: 16 additions & 0 deletions src/taipy/core/data/_abstract_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,22 @@ def _get_read_query(self, operators: Optional[Union[List, Tuple]] = None, join_o
def _get_base_read_query(self) -> str:
raise NotImplementedError

def _append(self, data) -> None:
engine = self._get_engine()
with engine.connect() as connection:
with connection.begin() as transaction:
try:
self._do_append(data, engine, connection)
except Exception as e:
transaction.rollback()
raise e
else:
transaction.commit()

@abstractmethod
def _do_append(self, data, engine, connection) -> None:
raise NotImplementedError

def _write(self, data) -> None:
"""Check data against a collection of types to handle insertion on the database."""
engine = self._get_engine()
Expand Down
25 changes: 21 additions & 4 deletions src/taipy/core/data/_data_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class _DataNodeConverter(_AbstractConverter):
_EXPOSED_TYPE_KEY = "exposed_type"
__WRITE_QUERY_BUILDER_NAME_KEY = "write_query_builder_name"
__WRITE_QUERY_BUILDER_MODULE_KEY = "write_query_builder_module"
__APPEND_QUERY_BUILDER_NAME_KEY = "append_query_builder_name"
__APPEND_QUERY_BUILDER_MODULE_KEY = "append_query_builder_module"
# TODO: This limits the valid string to only the ones provided by the Converter.
# While in practice, each data nodes might have different exposed type possibilities.
# The previous implementation used tabular datanode but it's no longer suitable so
Expand Down Expand Up @@ -71,11 +73,16 @@ def __serialize_json_dn_properties(cls, datanode_properties: dict):

@classmethod
def __serialize_sql_dn_properties(cls, datanode_properties: dict) -> dict:
query_builder = datanode_properties.get(SQLDataNode._WRITE_QUERY_BUILDER_KEY)
datanode_properties[cls.__WRITE_QUERY_BUILDER_NAME_KEY] = query_builder.__name__ if query_builder else None
datanode_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY] = query_builder.__module__ if query_builder else None
write_qb = datanode_properties.get(SQLDataNode._WRITE_QUERY_BUILDER_KEY)
datanode_properties[cls.__WRITE_QUERY_BUILDER_NAME_KEY] = write_qb.__name__ if write_qb else None
datanode_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY] = write_qb.__module__ if write_qb else None
datanode_properties.pop(SQLDataNode._WRITE_QUERY_BUILDER_KEY, None)

append_qb = datanode_properties.get(SQLDataNode._APPEND_QUERY_BUILDER_KEY)
datanode_properties[cls.__APPEND_QUERY_BUILDER_NAME_KEY] = append_qb.__name__ if append_qb else None
datanode_properties[cls.__APPEND_QUERY_BUILDER_MODULE_KEY] = append_qb.__module__ if append_qb else None
datanode_properties.pop(SQLDataNode._APPEND_QUERY_BUILDER_KEY, None)

return datanode_properties

@classmethod
Expand Down Expand Up @@ -209,7 +216,6 @@ def __deserialize_json_dn_properties(cls, datanode_model_properties: dict) -> di

@classmethod
def __deserialize_sql_dn_model_properties(cls, datanode_model_properties: dict) -> dict:

if datanode_model_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY]:
datanode_model_properties[SQLDataNode._WRITE_QUERY_BUILDER_KEY] = _load_fct(
datanode_model_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY],
Expand All @@ -221,6 +227,17 @@ def __deserialize_sql_dn_model_properties(cls, datanode_model_properties: dict)
del datanode_model_properties[cls.__WRITE_QUERY_BUILDER_NAME_KEY]
del datanode_model_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY]

if datanode_model_properties[cls.__APPEND_QUERY_BUILDER_MODULE_KEY]:
datanode_model_properties[SQLDataNode._APPEND_QUERY_BUILDER_KEY] = _load_fct(
datanode_model_properties[cls.__APPEND_QUERY_BUILDER_MODULE_KEY],
datanode_model_properties[cls.__APPEND_QUERY_BUILDER_NAME_KEY],
)
else:
datanode_model_properties[SQLDataNode._APPEND_QUERY_BUILDER_KEY] = None

del datanode_model_properties[cls.__APPEND_QUERY_BUILDER_NAME_KEY]
del datanode_model_properties[cls.__APPEND_QUERY_BUILDER_MODULE_KEY]

return datanode_model_properties

@classmethod
Expand Down
8 changes: 8 additions & 0 deletions src/taipy/core/data/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ def _read_as_modin_dataframe(
except pd.errors.EmptyDataError:
return modin_pd.DataFrame()

def _append(self, data: Any):
if isinstance(data, (pd.DataFrame, modin_pd.DataFrame)):
data.to_csv(self._path, mode="a", index=False, encoding=self.properties[self.__ENCODING_KEY], header=False)
else:
pd.DataFrame(data).to_csv(
self._path, mode="a", index=False, encoding=self.properties[self.__ENCODING_KEY], header=False
)

def _write(self, data: Any):
if isinstance(data, (pd.DataFrame, modin_pd.DataFrame)):
data.to_csv(self._path, index=False, encoding=self.properties[self.__ENCODING_KEY])
Expand Down
20 changes: 20 additions & 0 deletions src/taipy/core/data/data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,22 @@ def read(self) -> Any:
)
return None

def append(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
"""Append some data to this data node.

Parameters:
data (Any): The data to write to this data node.
job_id (JobId^): An optional identifier of the writer.
**kwargs (dict[str, any]): Extra information to attach to the edit document
corresponding to this write.
"""
from ._data_manager_factory import _DataManagerFactory

self._append(data)
self.track_edit(job_id=job_id, **kwargs)
self.unlock_edit()
_DataManagerFactory._build_manager()._set(self)

def write(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
"""Write some data to this data node.

Expand Down Expand Up @@ -438,6 +454,10 @@ def __getitem__(self, item):
def _read(self):
raise NotImplementedError

@abstractmethod
def _append(self, data):
raise NotImplementedError

@abstractmethod
def _write(self, data):
raise NotImplementedError
Expand Down
40 changes: 40 additions & 0 deletions src/taipy/core/data/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,46 @@ def _read_as_modin_dataframe(
except pd.errors.EmptyDataError:
return modin_pd.DataFrame()

def __append_excel_with_single_sheet(self, append_excel_fct, *args, **kwargs):
sheet_name = self.properties.get(self.__SHEET_NAME_PROPERTY)

with pd.ExcelWriter(self._path, mode="a", engine="openpyxl", if_sheet_exists="overlay") as writer:
if sheet_name:
if not isinstance(sheet_name, str):
sheet_name = sheet_name[0]
append_excel_fct(
writer, *args, **kwargs, sheet_name=sheet_name, startrow=writer.sheets[sheet_name].max_row
)
else:
sheet_name = list(writer.sheets.keys())[0]
append_excel_fct(writer, *args, **kwargs, startrow=writer.sheets[sheet_name].max_row)

def __append_excel_with_multiple_sheets(self, data: Any, columns: List[str] = None):
with pd.ExcelWriter(self._path, mode="a", engine="openpyxl", if_sheet_exists="overlay") as writer:
# Each key stands for a sheet name
for sheet_name in data.keys():
if isinstance(data[sheet_name], np.ndarray):
df = pd.DataFrame(data[sheet_name])
else:
df = data[sheet_name]

if columns:
data[sheet_name].columns = columns

df.to_excel(
writer, sheet_name=sheet_name, index=False, header=False, startrow=writer.sheets[sheet_name].max_row
)

def _append(self, data: Any):
if isinstance(data, Dict) and all(
[isinstance(x, (pd.DataFrame, modin_pd.DataFrame, np.ndarray)) for x in data.values()]
):
self.__append_excel_with_multiple_sheets(data)
elif isinstance(data, (pd.DataFrame, modin_pd.DataFrame)):
self.__append_excel_with_single_sheet(data.to_excel, index=False, header=False)
else:
self.__append_excel_with_single_sheet(pd.DataFrame(data).to_excel, index=False, header=False)

def __write_excel_with_single_sheet(self, write_excel_fct, *args, **kwargs):
sheet_name = self.properties.get(self.__SHEET_NAME_PROPERTY)
if sheet_name:
Expand Down
14 changes: 14 additions & 0 deletions src/taipy/core/data/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,20 @@ def _read(self):
with open(self._path, "r", encoding=self.properties[self.__ENCODING_KEY]) as f:
return json.load(f, cls=self._decoder)

def _append(self, data: Any):
with open(self._path, "r+", encoding=self.properties[self.__ENCODING_KEY]) as f:
file_data = json.load(f, cls=self._decoder)
if isinstance(file_data, List):
if isinstance(data, List):
file_data.extend(data)
else:
file_data.append(data)
elif isinstance(data, Dict):
file_data.update(data)

f.seek(0)
json.dump(file_data, f, indent=4, cls=self._encoder)

def _write(self, data: Any):
with open(self._path, "w", encoding=self.properties[self.__ENCODING_KEY]) as f: # type: ignore
json.dump(data, f, indent=4, cls=self._encoder)
Expand Down
38 changes: 29 additions & 9 deletions src/taipy/core/data/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,35 +217,55 @@ def _read_by_query(self, operators: Optional[Union[List, Tuple]] = None, join_op

return self.collection.find(query)

def _write(self, data) -> None:
"""Check data against a collection of types to handle insertion on the database."""
def _append(self, data) -> None:
"""Append data to a Mongo collection."""
if not isinstance(data, list):
data = [data]

if len(data) == 0:
self.collection.drop()
return

if isinstance(data[0], dict):
self._insert_dicts(data)
else:
self._insert_dicts([self._encoder(row) for row in data])

def _insert_dicts(self, data: List[Dict]) -> None:
def _write(self, data) -> None:
"""Check data against a collection of types to handle insertion on the database.

Parameters:
data (Any): the data to write to the database.
"""
:param data: a list of dictionaries
if not isinstance(data, list):
data = [data]

if len(data) == 0:
self.collection.drop()
return

This method will overwrite the data contained in a list of dictionaries into a collection.
if isinstance(data[0], dict):
self._insert_dicts(data, drop=True)
else:
self._insert_dicts([self._encoder(row) for row in data], drop=True)

def _insert_dicts(self, data: List[Dict], drop=False) -> None:
"""
This method will insert data contained in a list of dictionaries into a collection.

Parameters:
data (List[Dict]): a list of dictionaries
drop (bool): drop the collection before inserting the data to overwrite the data in the collection.
"""
self.collection.drop()
if drop:
self.collection.drop()

self.collection.insert_many(data)

def _default_decoder(self, document: Dict) -> Any:
"""Decode a Mongo dictionary to a custom document object for reading.

Args:
Parameters:
document (Dict): the document dictionary return by Mongo query.

Returns:
A custom document object.
"""
Expand Down
3 changes: 3 additions & 0 deletions src/taipy/core/data/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ def _read_as_pandas_dataframe(self, read_kwargs: Dict) -> pd.DataFrame:
def _read_as_modin_dataframe(self, read_kwargs: Dict) -> modin_pd.DataFrame:
return modin_pd.read_parquet(self._path, **read_kwargs)

def _append(self, data: Any):
self.write_with_kwargs(data, engine="fastparquet", append=True)

def _write(self, data: Any):
self.write_with_kwargs(data)

Expand Down
15 changes: 13 additions & 2 deletions src/taipy/core/data/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ class SQLDataNode(_AbstractSQLDataNode):
_"postgresql"_.
- _"read_query"_ `(str)`: The SQL query string used to read the data from the database.
- _"write_query_builder"_ `(Callable)`: A callback function that takes the data as an input parameter and
returns a list of SQL queries.
returns a list of SQL queries to be executed when writing data to the data node.
- _"append_query_builder"_ `(Callable)`: A callback function that takes the data as an input parameter and
returns a list of SQL queries to be executed when appending data to the data node.
- _"db_username"_ `(str)`: The database username.
- _"db_password"_ `(str)`: The database password.
- _"db_host"_ `(str)`: The database host. The default value is _"localhost"_.
Expand All @@ -72,6 +74,7 @@ class SQLDataNode(_AbstractSQLDataNode):
__STORAGE_TYPE = "sql"
__READ_QUERY_KEY = "read_query"
_WRITE_QUERY_BUILDER_KEY = "write_query_builder"
_APPEND_QUERY_BUILDER_KEY = "append_query_builder"

def __init__(
self,
Expand Down Expand Up @@ -116,6 +119,7 @@ def __init__(
{
self.__READ_QUERY_KEY,
self._WRITE_QUERY_BUILDER_KEY,
self._APPEND_QUERY_BUILDER_KEY,
}
)

Expand All @@ -126,9 +130,16 @@ def storage_type(cls) -> str:
def _get_base_read_query(self) -> str:
return self.properties.get(self.__READ_QUERY_KEY)

def _do_append(self, data, engine, connection) -> None:
queries = self.properties.get(self._APPEND_QUERY_BUILDER_KEY)(data)
trgiangdo marked this conversation as resolved.
Show resolved Hide resolved
self.__execute_queries(queries, connection)

def _do_write(self, data, engine, connection) -> None:
queries = self.properties.get(self._WRITE_QUERY_BUILDER_KEY)(data)
if not isinstance(queries, list):
self.__execute_queries(queries, connection)

def __execute_queries(self, queries, connection) -> None:
if not isinstance(queries, List):
queries = [queries]
for query in queries:
if isinstance(query, str):
Expand Down
Loading
Loading