Skip to content
This repository has been archived by the owner on Jan 2, 2024. It is now read-only.

Feature/#755 - Open append() method on datanodes #824

Merged
merged 6 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/taipy/core/data/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ def _read_as_modin_dataframe(
except pd.errors.EmptyDataError:
return modin_pd.DataFrame()

def _append(self, data: Any):
if isinstance(data, (pd.DataFrame, modin_pd.DataFrame)):
data.to_csv(self._path, mode="a", index=False, encoding=self.properties[self.__ENCODING_KEY], header=False)
else:
pd.DataFrame(data).to_csv(
self._path, mode="a", index=False, encoding=self.properties[self.__ENCODING_KEY], header=False
)

def _write(self, data: Any):
if isinstance(data, (pd.DataFrame, modin_pd.DataFrame)):
data.to_csv(self._path, index=False, encoding=self.properties[self.__ENCODING_KEY])
Expand Down
20 changes: 20 additions & 0 deletions src/taipy/core/data/data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,22 @@ def read(self) -> Any:
)
return None

def append(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
"""Append some data to this data node.

Parameters:
data (Any): The data to write to this data node.
job_id (JobId^): An optional identifier of the writer.
**kwargs (dict[str, any]): Extra information to attach to the edit document
corresponding to this write.
"""
from ._data_manager_factory import _DataManagerFactory

self._append(data)
self.track_edit(job_id=job_id, **kwargs)
self.unlock_edit()
_DataManagerFactory._build_manager()._set(self)

def write(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
"""Write some data to this data node.

Expand Down Expand Up @@ -438,6 +454,10 @@ def __getitem__(self, item):
def _read(self):
raise NotImplementedError

@abstractmethod
def _append(self, data):
raise NotImplementedError

@abstractmethod
def _write(self, data):
raise NotImplementedError
Expand Down
40 changes: 40 additions & 0 deletions src/taipy/core/data/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,46 @@ def _read_as_modin_dataframe(
except pd.errors.EmptyDataError:
return modin_pd.DataFrame()

def __append_excel_with_single_sheet(self, append_excel_fct, *args, **kwargs):
sheet_name = self.properties.get(self.__SHEET_NAME_PROPERTY)

with pd.ExcelWriter(self._path, mode="a", engine="openpyxl", if_sheet_exists="overlay") as writer:
if sheet_name:
if not isinstance(sheet_name, str):
sheet_name = sheet_name[0]
append_excel_fct(
writer, *args, **kwargs, sheet_name=sheet_name, startrow=writer.sheets[sheet_name].max_row
)
else:
sheet_name = list(writer.sheets.keys())[0]
append_excel_fct(writer, *args, **kwargs, startrow=writer.sheets[sheet_name].max_row)

def __append_excel_with_multiple_sheets(self, data: Any, columns: List[str] = None):
with pd.ExcelWriter(self._path, mode="a", engine="openpyxl", if_sheet_exists="overlay") as writer:
# Each key stands for a sheet name
for sheet_name in data.keys():
if isinstance(data[sheet_name], np.ndarray):
df = pd.DataFrame(data[sheet_name])
else:
df = data[sheet_name]

if columns:
data[sheet_name].columns = columns

df.to_excel(
writer, sheet_name=sheet_name, index=False, header=False, startrow=writer.sheets[sheet_name].max_row
)

def _append(self, data: Any):
if isinstance(data, Dict) and all(
[isinstance(x, (pd.DataFrame, modin_pd.DataFrame, np.ndarray)) for x in data.values()]
):
self.__append_excel_with_multiple_sheets(data)
elif isinstance(data, (pd.DataFrame, modin_pd.DataFrame)):
self.__append_excel_with_single_sheet(data.to_excel, index=False, header=False)
else:
self.__append_excel_with_single_sheet(pd.DataFrame(data).to_excel, index=False, header=False)

def __write_excel_with_single_sheet(self, write_excel_fct, *args, **kwargs):
sheet_name = self.properties.get(self.__SHEET_NAME_PROPERTY)
if sheet_name:
Expand Down
14 changes: 14 additions & 0 deletions src/taipy/core/data/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,20 @@ def _read(self):
with open(self._path, "r", encoding=self.properties[self.__ENCODING_KEY]) as f:
return json.load(f, cls=self._decoder)

def _append(self, data: Any):
with open(self._path, "r+", encoding=self.properties[self.__ENCODING_KEY]) as f:
file_data = json.load(f, cls=self._decoder)
if isinstance(file_data, List):
if isinstance(data, List):
file_data.extend(data)
else:
file_data.append(data)
elif isinstance(data, Dict):
file_data.update(data)

f.seek(0)
json.dump(file_data, f, indent=4, cls=self._encoder)

def _write(self, data: Any):
with open(self._path, "w", encoding=self.properties[self.__ENCODING_KEY]) as f: # type: ignore
json.dump(data, f, indent=4, cls=self._encoder)
Expand Down
38 changes: 29 additions & 9 deletions src/taipy/core/data/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,35 +217,55 @@ def _read_by_query(self, operators: Optional[Union[List, Tuple]] = None, join_op

return self.collection.find(query)

def _write(self, data) -> None:
"""Check data against a collection of types to handle insertion on the database."""
def _append(self, data) -> None:
"""Append data to a Mongo collection."""
if not isinstance(data, list):
data = [data]

if len(data) == 0:
self.collection.drop()
return

if isinstance(data[0], dict):
self._insert_dicts(data)
else:
self._insert_dicts([self._encoder(row) for row in data])

def _insert_dicts(self, data: List[Dict]) -> None:
def _write(self, data) -> None:
"""Check data against a collection of types to handle insertion on the database.

Parameters:
data (Any): the data to write to the database.
"""
:param data: a list of dictionaries
if not isinstance(data, list):
data = [data]

if len(data) == 0:
self.collection.drop()
return

This method will overwrite the data contained in a list of dictionaries into a collection.
if isinstance(data[0], dict):
self._insert_dicts(data, drop=True)
else:
self._insert_dicts([self._encoder(row) for row in data], drop=True)

def _insert_dicts(self, data: List[Dict], drop=False) -> None:
"""
This method will insert data contained in a list of dictionaries into a collection.

Parameters:
data (List[Dict]): a list of dictionaries
drop (bool): drop the collection before inserting the data to overwrite the data in the collection.
"""
self.collection.drop()
if drop:
self.collection.drop()

self.collection.insert_many(data)

def _default_decoder(self, document: Dict) -> Any:
"""Decode a Mongo dictionary to a custom document object for reading.

Args:
Parameters:
document (Dict): the document dictionary return by Mongo query.

Returns:
A custom document object.
"""
Expand Down
91 changes: 58 additions & 33 deletions src/taipy/core/data/sql_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,25 +116,52 @@ def storage_type(cls) -> str:
def _get_base_read_query(self) -> str:
return f"SELECT * FROM {self.properties[self.__TABLE_KEY]}"

def _append(self, data) -> None:
engine = self._get_engine()
with engine.connect() as connection:
with connection.begin() as transaction:
try:
self.__insert_data(data, engine, connection)
except Exception as e:
transaction.rollback()
raise e
else:
transaction.commit()

def _do_write(self, data, engine, connection) -> None:
self.__insert_data(data, engine, connection, delete_table=True)

def __insert_data(self, data, engine, connection, delete_table: bool = False) -> None:
"""
Insert data into a SQL table.

Parameters:
data (List[Dict]): a list of dictionaries, where each dictionary represents a row of the table.
table: a SQLAlchemy object that represents a table.
connection: a SQLAlchemy connection to write the data.
delete_table (bool): indicates if the table should be deleted before inserting the data.
"""
table = self._create_table(engine)
if isinstance(data, (modin_pd.DataFrame, pd.DataFrame)):
self._insert_dataframe(data, table, connection)
self.__insert_dataframe(data, table, connection, delete_table)
return

if isinstance(data, np.ndarray):
data = data.tolist()
if not isinstance(data, list):
data = [data]

if len(data) == 0:
self.__delete_all_rows(table, connection, delete_table)
return

if isinstance(data[0], (tuple, list)):
self.__insert_tuples(data, table, connection, delete_table)
elif isinstance(data[0], dict):
self.__insert_dicts(data, table, connection, delete_table)
# If data is a primitive type, it will be inserted as a tuple of one element.
else:
if isinstance(data, np.ndarray):
data = data.tolist()
if not isinstance(data, list):
data = [data]
if len(data) == 0:
self._delete_all_rows(table, connection)
else:
if isinstance(data[0], (tuple, list)):
self._insert_tuples(data, table, connection)
elif isinstance(data[0], dict):
self._insert_dicts(data, table, connection)
# If data is a primitive type, it will be inserted as a tuple of one element.
else:
self._insert_tuples([(x,) for x in data], table, connection)
self.__insert_tuples([(x,) for x in data], table, connection, delete_table)

def _create_table(self, engine) -> Table:
return Table(
Expand All @@ -143,37 +170,35 @@ def _create_table(self, engine) -> Table:
autoload_with=engine,
)

@staticmethod
def _insert_dicts(data: List[Dict], table: Any, connection: Any) -> None:
@classmethod
def __insert_dicts(cls, data: List[Dict], table: Any, connection: Any, delete_table: bool) -> None:
"""
This method will insert the data contained in a list of dictionaries into a table. The query itself is handled
by SQLAlchemy, so it's only needed to pass the correct data type.
"""
connection.execute(table.delete())
cls.__delete_all_rows(table, connection, delete_table)
connection.execute(table.insert(), data)

@staticmethod
def _insert_dataframe(df: Union[modin_pd.DataFrame, pd.DataFrame], table: Any, connection: Any) -> None:
"""
:param data: a pandas dataframe
:param table: a SQLAlchemy object that represents a table
:param connection: a SQLAlchemy connection to write the data
"""
SQLTableDataNode._insert_dicts(df.to_dict(orient="records"), table, connection)

@staticmethod
def _delete_all_rows(table, connection):
connection.execute(table.delete())
@classmethod
def __insert_dataframe(
cls, df: Union[modin_pd.DataFrame, pd.DataFrame], table: Any, connection: Any, delete_table: bool
) -> None:
cls.__insert_dicts(df.to_dict(orient="records"), table, connection, delete_table)

@staticmethod
def _insert_tuples(data: List[Union[Tuple, List]], table: Any, connection: Any) -> None:
@classmethod
def __insert_tuples(cls, data: List[Union[Tuple, List]], table: Any, connection: Any, delete_table: bool) -> None:
"""
This method will look up the length of the first object of the list and build the insert through
creation of a string of '?' equivalent to the length of the element. The '?' character is used as
placeholder for a tuple of same size.
"""
connection.execute(table.delete())
cls.__delete_all_rows(table, connection, delete_table)
markers = ",".join("?" * len(data[0]))
ins = "INSERT INTO {tablename} VALUES ({markers})"
ins = ins.format(tablename=table.name, markers=markers)
connection.execute(ins, data)

@classmethod
def __delete_all_rows(cls, table: Any, connection: Any, delete_table: bool) -> None:
if delete_table:
connection.execute(table.delete())
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def excel_file_with_sheet_name(tmpdir_factory) -> str:
def json_file(tmpdir_factory) -> str:
json_data = pd.DataFrame([{"a": 1, "b": 2, "c": 3}, {"a": 4, "b": 5, "c": 6}])
fn = tmpdir_factory.mktemp("data").join("df.json")
json_data.to_json(str(fn))
json_data.to_json(str(fn), orient="records")
return fn.strpath


Expand Down
38 changes: 38 additions & 0 deletions tests/core/data/test_csv_data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,44 @@ def test_read_without_header(self):
assert str(row_pandas[1]) == row_custom.integer
assert row_pandas[2] == row_custom.text

@pytest.mark.parametrize(
"content",
[
([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
(pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
([[11, 22, 33], [44, 55, 66]]),
],
)
def test_append(self, csv_file, default_data_frame, content):
csv_dn = CSVDataNode("foo", Scope.SCENARIO, properties={"path": csv_file})
assert_frame_equal(csv_dn.read(), default_data_frame)

csv_dn.append(content)
assert_frame_equal(
csv_dn.read(),
pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(drop=True),
)

@pytest.mark.parametrize(
"content",
[
([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
(pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
([[11, 22, 33], [44, 55, 66]]),
],
)
def test_append_modin(self, csv_file, default_data_frame, content):
csv_dn = CSVDataNode("foo", Scope.SCENARIO, properties={"path": csv_file, "exposed_type": "modin"})
df_equals(csv_dn.read(), modin_pd.DataFrame(default_data_frame))

csv_dn.append(content)
df_equals(
csv_dn.read(),
modin_pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(
drop=True
),
)

@pytest.mark.parametrize(
"content,columns",
[
Expand Down
Loading
Loading