Skip to content

Commit

Permalink
added tests + minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Toan Quach authored and Toan Quach committed Jun 19, 2024
1 parent 892ab38 commit 612ef76
Show file tree
Hide file tree
Showing 12 changed files with 521 additions and 80 deletions.
5 changes: 2 additions & 3 deletions taipy/core/data/_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@


class _DataManager(_Manager[DataNode], _VersionMixin):
__DATA_NODE_CLASS_MAP = DataNode._class_map() # type: ignore
_DATA_NODE_CLASS_MAP = DataNode._class_map() # type: ignore
_ENTITY_NAME = DataNode.__name__
_EVENT_ENTITY_TYPE = EventEntityType.DATA_NODE
_repository: _DataFSRepository
__NAME_KEY = "name"

@classmethod
def _bulk_get_or_create(
Expand Down Expand Up @@ -90,7 +89,7 @@ def __create(
else:
storage_type = Config.sections[DataNodeConfig.name][_Config.DEFAULT_KEY].storage_type

return cls.__DATA_NODE_CLASS_MAP[storage_type](
return cls._DATA_NODE_CLASS_MAP[storage_type](
config_id=data_node_config.id,
scope=data_node_config.scope or DataNodeConfig._DEFAULT_SCOPE,
validity_period=data_node_config.validity_period,
Expand Down
8 changes: 3 additions & 5 deletions taipy/core/data/_tabular_datanode_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class _TabularDataNodeMixin(object):
_EXPOSED_TYPE_PANDAS = "pandas"
_EXPOSED_TYPE_POLARS = "polars"
_EXPOSED_TYPE_MODIN = "modin" # Deprecated in favor of pandas since 3.1.0
__VALID_STRING_EXPOSED_TYPES = [_EXPOSED_TYPE_PANDAS, _EXPOSED_TYPE_NUMPY, _EXPOSED_TYPE_POLARS]
_VALID_STRING_EXPOSED_TYPES = [_EXPOSED_TYPE_PANDAS, _EXPOSED_TYPE_NUMPY, _EXPOSED_TYPE_POLARS]

def __init__(self, **kwargs) -> None:
self._decoder: Union[Callable[[List[Any]], Any], Callable[[Dict[Any, Any]], Any]]
Expand All @@ -46,13 +46,11 @@ def __init__(self, **kwargs) -> None:
if callable(custom_encoder):
self._encoder = custom_encoder

def _convert_data_to_dataframe(self, exposed_type: Any, data: Any) -> Union[pd.DataFrame, pd.Series]:
def _convert_data_to_dataframe(self, exposed_type: Any, data: Any) -> Union[pd.DataFrame, pd.Series, pl.DataFrame]:
if exposed_type == self._EXPOSED_TYPE_PANDAS and isinstance(data, (pd.DataFrame, pd.Series)):
return data
elif exposed_type == self._EXPOSED_TYPE_NUMPY and isinstance(data, np.ndarray):
return pd.DataFrame(data)
elif exposed_type == self._EXPOSED_TYPE_POLARS and isinstance(data, (pl.DataFrame, pl.Series)):
return data
elif isinstance(data, list) and not isinstance(exposed_type, str):
return pd.DataFrame.from_records([self._encoder(row) for row in data])
return pd.DataFrame(data)
Expand All @@ -70,7 +68,7 @@ def _get_valid_exposed_type(cls, properties: Dict):

@classmethod
def _check_exposed_type(cls, exposed_type):
valid_string_exposed_types = cls.__VALID_STRING_EXPOSED_TYPES
valid_string_exposed_types = cls._VALID_STRING_EXPOSED_TYPES
if isinstance(exposed_type, str) and exposed_type not in valid_string_exposed_types:
raise InvalidExposedType(
f"Invalid string exposed type {exposed_type}. Supported values are "
Expand Down
62 changes: 31 additions & 31 deletions taipy/core/data/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import numpy as np
import pandas as pd
import polars as pl

from taipy.config.common.scope import Scope

Expand Down Expand Up @@ -136,11 +135,12 @@ def storage_type(cls) -> str:
return cls.__STORAGE_TYPE

def _read(self):
if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_PANDAS:
exposed_type = self.properties[self._EXPOSED_TYPE_PROPERTY]
if exposed_type == self._EXPOSED_TYPE_PANDAS:
return self._read_as_pandas_dataframe()
if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_POLARS:
return self._read_as_polars_dataframe()
if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_NUMPY:
# if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_POLARS:
# return self._read_as_polars_dataframe()
if exposed_type == self._EXPOSED_TYPE_NUMPY:
return self._read_as_numpy()
return self._read_as()

Expand Down Expand Up @@ -173,22 +173,26 @@ def _read_as_pandas_dataframe(
except pd.errors.EmptyDataError:
return pd.DataFrame()

def _read_as_polars_dataframe(
self, usecols: Optional[List[int]] = None, column_names: Optional[List[str]] = None
) -> pd.DataFrame:
try:
if self.properties[self._HAS_HEADER_PROPERTY]:
if column_names:
return pl.read_csv(self._path, encoding=self.properties[self.__ENCODING_KEY], columns=column_names)
return pl.read_csv(self._path, encoding=self.properties[self.__ENCODING_KEY])
else:
if usecols:
return pl.read_csv(
self._path, encoding=self.properties[self.__ENCODING_KEY], header=None, usecols=usecols
)
return pl.read_csv(self._path, encoding=self.properties[self.__ENCODING_KEY], header=None)
except pl.exceptions.NoDataError:
return pl.DataFrame()
# def _read_as_polars_dataframe(
# self, usecols: Optional[List[int]] = None, column_names: Optional[List[str]] = None
# ) -> pd.DataFrame:
# properties = self.properties

# try:
# if properties[self._HAS_HEADER_PROPERTY]:
# if column_names:
# return pl.read_csv(
# self._path, encoding=properties[self.__ENCODING_KEY], columns=column_names, has_header=True
# )
# return pl.read_csv(self._path, encoding=properties[self.__ENCODING_KEY], has_header=True)
# else:
# if usecols:
# return pl.read_csv(
# self._path, encoding=properties[self.__ENCODING_KEY], has_header=False, usecols=usecols
# )
# return pl.read_csv(self._path, encoding=self.properties[self.__ENCODING_KEY], has_header=False)
# except pl.exceptions.NoDataError:
# return pl.DataFrame()

def _append(self, data: Any):
if isinstance(data, pd.DataFrame):
Expand All @@ -198,16 +202,15 @@ def _append(self, data: Any):
self._path, mode="a", index=False, encoding=self.properties[self.__ENCODING_KEY], header=False
)

def _write(self, data: Any):
def _write(self, data: Any, columns: Optional[List[str]] = None):
exposed_type = self.properties[self._EXPOSED_TYPE_PROPERTY]
header = True if self.properties[self._HAS_HEADER_PROPERTY] else None

if exposed_type == self._EXPOSED_TYPE_POLARS and isinstance(data, (pl.DataFrame, pl.Series)):
data.write_csv(self._path, include_header=header or False)
else:
if self.properties[self._HAS_HEADER_PROPERTY]:
self._convert_data_to_dataframe(exposed_type, data).to_csv(
self._path, index=False, encoding=self.properties[self.__ENCODING_KEY]
)
self._convert_data_to_dataframe(exposed_type, data).to_csv(
self._path, index=False, encoding=self.properties[self.__ENCODING_KEY], header=None
)

def write_with_column_names(self, data: Any, columns: Optional[List[str]] = None, job_id: Optional[JobId] = None):
"""Write a selection of columns.
Expand All @@ -217,8 +220,5 @@ def write_with_column_names(self, data: Any, columns: Optional[List[str]] = None
columns (Optional[List[str]]): The list of column names to write.
job_id (JobId^): An optional identifier of the writer.
"""
df = self._convert_data_to_dataframe(self.properties[self._EXPOSED_TYPE_PROPERTY], data)
if columns and isinstance(df, pd.DataFrame):
df.columns = columns
df.to_csv(self._path, index=False, encoding=self.properties[self.__ENCODING_KEY])
self._write(data, columns)
self.track_edit(timestamp=datetime.now(), job_id=job_id)
32 changes: 21 additions & 11 deletions taipy/core/data/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class ExcelDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):

__STORAGE_TYPE = "excel"
__SHEET_NAME_PROPERTY = "sheet_name"
__EXCEL_ENGINE = "openpyxl"

_REQUIRED_PROPERTIES: List[str] = []

Expand Down Expand Up @@ -229,19 +230,22 @@ def _do_read_excel(self, sheet_names, kwargs) -> Union[Dict[Union[int, str], pd.

def __get_sheet_names_and_header(self, sheet_names):
kwargs = {}
properties = self.properties
if sheet_names is None:
sheet_names = self.properties[self.__SHEET_NAME_PROPERTY]
if not self.properties[self._HAS_HEADER_PROPERTY]:
sheet_names = properties[self.__SHEET_NAME_PROPERTY]
if not properties[self._HAS_HEADER_PROPERTY]:
kwargs["header"] = None
return sheet_names, kwargs

def _read_as_polars_dataframe(self, sheet_names=None) -> Union[Dict[Union[int, str], pl.DataFrame], pl.DataFrame]:
sheet_names, kwargs = self.__get_sheet_names_and_header(sheet_names)
try:
if sheet_names:
return pl.read_excel(self._path, sheet_name=sheet_names, **kwargs)
return pl.read_excel(
self._path, sheet_name=sheet_names, engine=self.__EXCEL_ENGINE, read_options=kwargs
)
else:
return pl.read_excel(self._path, sheet_id=0, **kwargs)
return pl.read_excel(self._path, sheet_id=0, engine=self.__EXCEL_ENGINE, read_options=kwargs)
except pl.exceptions.NoDataError:
return pl.DataFrame()

Expand All @@ -255,7 +259,7 @@ def _read_as_pandas_dataframe(self, sheet_names=None) -> Union[Dict[Union[int, s
def __append_excel_with_single_sheet(self, append_excel_fct, *args, **kwargs):
sheet_name = self.properties.get(self.__SHEET_NAME_PROPERTY)

with pd.ExcelWriter(self._path, mode="a", engine="openpyxl", if_sheet_exists="overlay") as writer:
with pd.ExcelWriter(self._path, mode="a", engine=self.__EXCEL_ENGINE, if_sheet_exists="overlay") as writer:
if sheet_name:
if not isinstance(sheet_name, str):
sheet_name = sheet_name[0]
Expand All @@ -267,7 +271,7 @@ def __append_excel_with_single_sheet(self, append_excel_fct, *args, **kwargs):
append_excel_fct(writer, *args, **kwargs, startrow=writer.sheets[sheet_name].max_row)

def __append_excel_with_multiple_sheets(self, data: Any, columns: List[str] = None):
with pd.ExcelWriter(self._path, mode="a", engine="openpyxl", if_sheet_exists="overlay") as writer:
with pd.ExcelWriter(self._path, mode="a", engine=self.__EXCEL_ENGINE, if_sheet_exists="overlay") as writer:
# Each key stands for a sheet name
for sheet_name in data.keys():
if isinstance(data[sheet_name], np.ndarray):
Expand Down Expand Up @@ -298,9 +302,9 @@ def _append(self, data: Any):
self.__append_excel_with_single_sheet(pd.DataFrame(data).to_excel, index=False, header=False)

def __write_excel_with_single_sheet(self, write_excel_fct, *args, **kwargs):
if (
sheet_name := self.properties.get(self.__SHEET_NAME_PROPERTY)
and self.properties[self._EXPOSED_TYPE_PROPERTY] != self._EXPOSED_TYPE_POLARS
properties = self.properties
if (sheet_name := properties.get(self.__SHEET_NAME_PROPERTY)) and (
properties[self._EXPOSED_TYPE_PROPERTY] != self._EXPOSED_TYPE_POLARS
):
if not isinstance(sheet_name, str):
if len(sheet_name) > 1:
Expand All @@ -319,6 +323,7 @@ def __write_excel_with_multiple_sheets(self, data: Dict, columns: List[str] = No
):
with Workbook(self._path) as wb:
for key, df in data.items():
df = self._convert_data_to_dataframe(self.properties[self._EXPOSED_TYPE_PROPERTY], data[key])
if columns:
df.columns = columns
df.write_excel(wb, worksheet=key)
Expand All @@ -337,10 +342,15 @@ def _write(self, data: Any):
if isinstance(data, Dict):
return self.__write_excel_with_multiple_sheets(data)
else:
data = self._convert_data_to_dataframe(self.properties[self._EXPOSED_TYPE_PROPERTY], data)
properties = self.properties
data = self._convert_data_to_dataframe(properties[self._EXPOSED_TYPE_PROPERTY], data)
if isinstance(data, pl.DataFrame):
work_sheet = properties[self.__SHEET_NAME_PROPERTY] or "Sheet1"
self.__write_excel_with_single_sheet(
data.write_excel, self._path, include_header=self.properties[self._HAS_HEADER_PROPERTY] or False
data.write_excel,
self._path,
worksheet=work_sheet,
include_header=self.properties[self._HAS_HEADER_PROPERTY] or False,
)
else:
self.__write_excel_with_single_sheet(
Expand Down
37 changes: 22 additions & 15 deletions taipy/core/data/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,12 @@ def __init__(

self._write_default_data(default_value)

if not self._last_edit_date and (isfile(self._path) or isdir(self._path)): # type: ignore
if (
not self._last_edit_date
and ( # type: ignore
isfile(self._path) or isdir(self._path[:-1] if self._path.endswith("*") else self._path)
)
):
self._last_edit_date = datetime.now()
self._TAIPY_PROPERTIES.update(
{
Expand Down Expand Up @@ -191,6 +196,8 @@ def _read_as_pandas_dataframe(self, read_kwargs: Dict) -> pd.DataFrame:
return pd.read_parquet(self._path, **read_kwargs)

def _read_as_polars_dataframe(self, read_kwargs: Dict) -> pl.DataFrame:
if read_kwargs.pop(self.__ENGINE_PROPERTY) == "pyarrow":
return pl.read_parquet(self._path, use_pyarrow=True, **read_kwargs)
return pl.read_parquet(self._path, **read_kwargs)

def _append(self, data: Any):
Expand All @@ -215,8 +222,10 @@ def write_with_kwargs(self, data: Any, job_id: Optional[JobId] = None, **write_k
else:
df = self._convert_data_to_dataframe(self.properties[self._EXPOSED_TYPE_PROPERTY], data)

if isinstance(df, (pl.DataFrame, pl.Series)):
if isinstance(df, pl.DataFrame):
df.write_parquet(self._path)
elif isinstance(df, pl.Series):
df.to_frame().write_parquet(self._path)
else:
kwargs = {
self.__ENGINE_PROPERTY: self.properties[self.__ENGINE_PROPERTY],
Expand Down Expand Up @@ -247,19 +256,17 @@ def read_with_kwargs(self, **read_kwargs):
return None

kwargs = self.properties[self.__READ_KWARGS_PROPERTY]
kwargs.update(
{
self.__ENGINE_PROPERTY: self.properties[self.__ENGINE_PROPERTY],
}
)
kwargs.update(read_kwargs)

if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_PANDAS:
return self._read_as_pandas_dataframe(kwargs)
if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_POLARS:
return self._read_as_polars_dataframe(kwargs)
else:
kwargs.update(
{
self.__ENGINE_PROPERTY: self.properties[self.__ENGINE_PROPERTY],
}
)
kwargs.update(read_kwargs)

if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_PANDAS:
return self._read_as_pandas_dataframe(kwargs)
if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_NUMPY:
return self._read_as_numpy(kwargs)
return self._read_as(kwargs)
if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_NUMPY:
return self._read_as_numpy(kwargs)
return self._read_as(kwargs)
21 changes: 21 additions & 0 deletions tests/core/data/test_read_csv_data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import numpy as np
import pandas as pd
import polars as pl
import pytest

from taipy.config.common.scope import Scope
Expand Down Expand Up @@ -56,6 +57,16 @@ def test_read_with_header_numpy():
assert np.array_equal(data_numpy, pd.read_csv(csv_file_path).to_numpy())


def test_read_with_header_polars():
csv_data_node_as_polars = CSVDataNode(
"bar", Scope.SCENARIO, properties={"path": csv_file_path, "has_header": True, "exposed_type": "polars"}
)
data_polars = csv_data_node_as_polars.read()
assert isinstance(data_polars, pl.DataFrame)
assert len(data_polars) == 10
assert np.array_equal(data_polars, pl.read_csv(csv_file_path))


def test_read_with_header_custom_exposed_type():
data_pandas = pd.read_csv(csv_file_path)

Expand Down Expand Up @@ -100,6 +111,16 @@ def test_read_without_header_numpy():
assert np.array_equal(data_numpy, pd.read_csv(csv_file_path, header=None).to_numpy())


def test_read_without_header_polars():
csv_data_node_as_polars = CSVDataNode(
"bar", Scope.SCENARIO, properties={"path": csv_file_path, "has_header": False, "exposed_type": "polars"}
)
data_polars = csv_data_node_as_polars.read()
assert isinstance(data_polars, pl.DataFrame)
assert len(data_polars) == 11
assert pl.DataFrame.equals(data_polars, pl.read_csv(csv_file_path, has_header=False))


def test_read_without_header_custom_exposed_type():
csv_data_node_as_custom_object = CSVDataNode(
"quux", Scope.SCENARIO, properties={"path": csv_file_path, "has_header": False, "exposed_type": MyCustomObject}
Expand Down
Loading

0 comments on commit 612ef76

Please sign in to comment.