Skip to content

Commit

Permalink
Use shared memory to write partial DataFrames (#33)
Browse files Browse the repository at this point in the history
- Improve performance of report extraction and features calculation by writing partial DataFrames to the shared memory (or temp directory, if shared memory is not available).
- Use zstd compression instead of snappy when writing parquet files.
- When repo is pickled, extract the DataFrames only if they aren't already stored in the cache.
- Remove fastparquet extra dependency.
  • Loading branch information
GianlucaFicarelli committed Apr 24, 2024
1 parent 5602fd3 commit c2861df
Show file tree
Hide file tree
Showing 14 changed files with 416 additions and 218 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
Changelog
=========

Version 0.9.1
-------------

Improvements
~~~~~~~~~~~~

- Improve performance of report extraction and features calculation by writing partial DataFrames to the shared memory (or temp directory, if shared memory is not available).
Both the used memory and the execution time should be lower than before, when processing large DataFrames.
- Use zstd compression instead of snappy when writing parquet files.
- When ``repo`` is pickled, extract the DataFrames only if they aren't already stored in the cache.
- Remove fastparquet extra dependency.

Version 0.9.0
-------------

Expand Down
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ dynamic = ["version"]
[project.optional-dependencies]
extra = [
# extra requirements that may be dropped at some point
"fastparquet>=0.8.3,!=2023.1.0", # needed by pandas to read and write parquet files
"orjson", # faster json decoder used by fastparquet
"tables>=3.6.1", # needed by pandas to read and write hdf files
]
external = [
Expand Down
83 changes: 55 additions & 28 deletions src/blueetl/extract/report.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
"""Generic Report extractor."""

import logging
import tempfile
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import NamedTuple, Optional, TypeVar
from functools import partial
from pathlib import Path
from typing import Callable, NamedTuple, Optional, TypeVar

import pandas as pd
from blueetl_core.utils import smart_concat
Expand All @@ -16,6 +19,8 @@
from blueetl.extract.simulations import Simulations
from blueetl.extract.windows import Windows
from blueetl.parallel import merge_filter
from blueetl.store.parquet import ParquetStore
from blueetl.utils import ensure_dtypes, get_shmdir, timed

L = logging.getLogger(__name__)
ReportExtractorT = TypeVar("ReportExtractorT", bound="ReportExtractor")
Expand Down Expand Up @@ -96,33 +101,55 @@ def from_simulations(
Returns:
New instance.
"""

def _func(key: NamedTuple, df_list: list[pd.DataFrame]) -> tuple[NamedTuple, pd.DataFrame]:
# executed in a subprocess
simulations_df, neurons_df, windows_df = df_list
simulation_id, simulation = simulations_df.etl.one()[[SIMULATION_ID, SIMULATION]]
assert simulation_id == key.simulation_id # type: ignore[attr-defined]
df_list = []
for inner_key, df in neurons_df.etl.groupby_iter([CIRCUIT_ID, NEURON_CLASS]):
population = neuron_classes.df.etl.one(
circuit_id=inner_key.circuit_id, neuron_class=inner_key.neuron_class
)[POPULATION]
result_df = cls._load_values(
simulation=simulation,
population=population,
gids=df[GID],
windows_df=windows_df,
with tempfile.TemporaryDirectory(prefix="blueetl_", dir=get_shmdir()) as _temp_folder:
with timed(L.info, "Executing merge_filter "):
func = partial(
_merge_filter_func,
temp_folder=Path(_temp_folder),
name=name,
neuron_classes_df=neuron_classes.df,
dataframe_builder=cls._load_values,
)
merge_filter(
df_list=[simulations.df, neurons.df, windows.df],
groupby=[SIMULATION_ID, CIRCUIT_ID],
func=func,
)
result_df[[SIMULATION_ID, *inner_key._fields]] = [simulation_id, *inner_key]
df_list.append(result_df)
return smart_concat(df_list, ignore_index=True)

all_df = merge_filter(
df_list=[simulations.df, neurons.df, windows.df],
groupby=[SIMULATION_ID, CIRCUIT_ID],
func=_func,
parallel=True,
with timed(L.info, "Executing concatenation"):
df = ParquetStore(Path(_temp_folder)).load()
df = ensure_dtypes(df)
return cls(df, cached=False, filtered=False)


def _merge_filter_func(
task_index: int,
key: NamedTuple,
df_list: list[pd.DataFrame],
temp_folder: Path,
name: str,
neuron_classes_df: pd.DataFrame,
dataframe_builder: Callable[..., pd.DataFrame],
) -> None:
"""Executed in a subprocess, write a partial DataFrame to temp_folder."""
# pylint: disable=too-many-locals
simulations_df, neurons_df, windows_df = df_list
simulation_id, simulation = simulations_df.etl.one()[[SIMULATION_ID, SIMULATION]]
assert simulation_id == key.simulation_id # type: ignore[attr-defined]
df_list = []
for inner_key, df in neurons_df.etl.groupby_iter([CIRCUIT_ID, NEURON_CLASS]):
population = neuron_classes_df.etl.one(
circuit_id=inner_key.circuit_id, neuron_class=inner_key.neuron_class
)[POPULATION]
result_df = dataframe_builder(
simulation=simulation,
population=population,
gids=df[GID],
windows_df=windows_df,
name=name,
)
df = smart_concat(all_df, ignore_index=True)
return cls(df, cached=False, filtered=False)
result_df[[SIMULATION_ID, *inner_key._fields]] = [simulation_id, *inner_key]
df_list.append(result_df)
result_df = smart_concat(df_list, ignore_index=True)
# the conversion to the desired dtype here is important to reduce memory usage and cpu time
result_df = ensure_dtypes(result_df)
ParquetStore(temp_folder).dump(result_df, name=f"{task_index:08d}")
Loading

0 comments on commit c2861df

Please sign in to comment.