Skip to content

Commit

Permalink
Make MeasuredData inherit from pandas DataFrame
Browse files Browse the repository at this point in the history
We want this to enable a more familiar interaction with MeasuredData
to anyone that is used to a pandas DataFrame, it also simplifies all
the operations by always returning a dataframe.
  • Loading branch information
oyvindeide committed Feb 4, 2021
1 parent f00e180 commit a1094cd
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 106 deletions.
235 changes: 129 additions & 106 deletions ert_data/measured.py
Original file line number Diff line number Diff line change
@@ -1,166 +1,189 @@
from collections import defaultdict

import deprecation
import pandas as pd
import numpy as np

from ert_data import loader

# importlib.metadata added in python 3.8
try:
from importlib import metadata
__version__ = metadata.version("ert")
except ImportError:
from pkg_resources import get_distribution
__version__ = get_distribution("ert").version

class MeasuredData(object):
def __init__(self, facade, keys, index_lists=None, load_data=True):
self._facade = facade

if index_lists is not None and len(index_lists) != len(keys):
raise ValueError("index list must be same length as observations keys")
class MeasuredData(pd.DataFrame):
def __init__(self, facade, keys, load_data=True):
"""
This is a bit ugly, but is close to the way pandas does this internally.
The problem is that when an operation is performed on a DataFrame, a view of
the dataframe is returned. We want that returned dataframe to be a MeasuredData
object, but we dont want to read from libres each time, so we need to have a
variable constructor.
"""
if isinstance(facade, pd.core.internals.BlockManager):
super().__init__(facade)
else:
df = _get_data(facade, keys, load_data)
super().__init__(data=df.values, columns=df.columns, index=df.index)

self._set_data(self._get_data(keys, load_data))
self._set_data(self.filter_on_column_index(keys, index_lists))
@property
def _constructor(self):
"""
Part of the pandas API
"""
return MeasuredData._internal_ctor

@classmethod
def _internal_ctor(cls, *args, **kwargs):
"""
This is a part of creating the constructor when pandas returns a subframe
we just insert None as the keys (it can be anything as we dont use it)
"""
kwargs['keys'] = None
return cls(*args, **kwargs)

@property
def data(self):
return self._data
def response(self):
return self._get_simulated_data()

def _set_data(self, data):
expected_keys = ["OBS", "STD"]
if not isinstance(data, pd.DataFrame):
raise TypeError(
"Invalid type: {}, should be type: {}".format(type(data), pd.DataFrame)
)
elif not set(expected_keys).issubset(data.index):
raise ValueError(
"{} should be present in DataFrame index, missing: {}".format(
["OBS", "STD"], set(expected_keys) - set(data.index)
)
)
else:
self._data = data
@property
def observations(self):
return self.loc[["OBS", "STD"]]

@deprecation.deprecated(
deprecated_in="2.19",
removed_in="3.0",
current_version=__version__,
details="Use self instead",
)
@property
def data(self):
return self

def remove_failed_realizations(self):
self._set_data(self._remove_failed_realizations())

return self._remove_failed_realizations()

@deprecation.deprecated(
deprecated_in="2.19",
removed_in="3.0",
current_version=__version__,
details="The the response property instead",
)
def get_simulated_data(self):
return self._get_simulated_data()

def _get_simulated_data(self):
return self.data[~self.data.index.isin(["OBS", "STD"])]
return self[~self.index.isin(["OBS", "STD"])]

def _remove_failed_realizations(self):
"""Removes rows with no simulated data, leaving observations and
standard deviations as-is."""
pre_index = self.data.index
post_index = list(self.data.dropna(axis=0, how="all").index)
pre_index = self.index
post_index = list(self.dropna(axis=0, how="all").index)
drop_index = set(pre_index) - set(post_index + ["STD", "OBS"])
return self.data.drop(index=drop_index)
return self.drop(index=drop_index)

def remove_inactive_observations(self):
self._set_data(self._remove_inactive_observations())
return self._remove_inactive_observations()

def _remove_inactive_observations(self):
"""Removes columns with one or more NaN values."""
filtered_dataset = self.data.dropna(axis=1)
filtered_dataset = self.dropna(axis=1)
if filtered_dataset.empty:
raise ValueError(
"This operation results in an empty dataset (could be due to one or more failed realizations)"
)
return filtered_dataset

@deprecation.deprecated(
deprecated_in="2.19",
removed_in="3.0",
current_version=__version__,
details="The the empty property instead",
)
def is_empty(self):
return self.data.empty

def _get_data(self, observation_keys, load_data=True):
"""
Adds simulated and observed data and returns a dataframe where ensamble
members will have a data key, observed data will be named OBS and
observed standard deviation will be named STD.
"""
case_name = self._facade.get_current_case_name()

# Because several observations can be linked to the same response we create
# a grouping to avoid reading the same response for each of the corresponding
# observations, as that is quite slow.
key_map = defaultdict(list)
for key in observation_keys:
data_key = self._facade.get_data_key_for_obs_key(key)
key_map[data_key].append(key)

measured_data = []

for obs_keys in key_map.values():
obs_types = [
self._facade.get_impl_type_name_for_obs_key(key) for key in obs_keys
]
assert len(set(obs_types)) == 1, (
f"\nMore than one observation type found for obs keys: {obs_keys}"
)
observation_type = obs_types[0]
data_loader = loader.data_loader_factory(observation_type)
measured_data.append(data_loader(self._facade, obs_keys, case_name, load_data))

return pd.concat(measured_data, axis=1)
return self.empty

def filter_ensemble_std(self, std_cutoff):
self._set_data(self._filter_ensemble_std(std_cutoff))
return self._filter_ensemble_std(std_cutoff)

def filter_ensemble_mean_obs(self, alpha):
self._set_data(self._filter_ensemble_mean_obs(alpha))
return self._filter_ensemble_mean_obs(alpha)

def _filter_ensemble_std(self, std_cutoff):
"""
Filters on ensamble variation versus a user defined standard
deviation cutoff. If there is not enough variation in the measurements
the data point is removed.
"""
ens_std = self.get_simulated_data().std()
ens_std = self.response.std()
std_filter = ens_std <= std_cutoff
return self.data.drop(columns=std_filter[std_filter].index)
return self.drop(columns=std_filter[std_filter].index)

def _filter_ensemble_mean_obs(self, alpha):
"""
Filters on distance between the observed data and the ensamble mean
based on variation and a user defined alpha.
"""
ens_mean = self.get_simulated_data().mean()
ens_std = self.get_simulated_data().std()
obs_values = self.data.loc["OBS"]
obs_std = self.data.loc["STD"]
ens_mean = self.response.mean()
ens_std = self.response.std()
obs_values = self.loc["OBS"]
obs_std = self.loc["STD"]

mean_filter = abs(obs_values - ens_mean) > alpha * (ens_std + obs_std)

return self.data.drop(columns=mean_filter[mean_filter].index)
return self.drop(columns=mean_filter[mean_filter].index)

def filter_on_column_index(self, obs_keys, index_lists):
if index_lists is None or all(index_list is None for index_list in index_lists):
return self.data
names = self.data.columns.get_level_values(0)
data_index = self.data.columns.get_level_values("data_index")
cond = self._create_condition(names, data_index, obs_keys, index_lists)
return self.data.iloc[:, cond]


@staticmethod
def _filter_on_column_index(dataframe, index_list):
"""
Returns a subset where the columns in index_list are filtered out
"""
if isinstance(index_list, (list, tuple)):
if max(index_list) > dataframe.shape[1]:
msg = (
"Index list is larger than observation data, please check input, max index list:"
"{} number of data points: {}".format(
max(index_list), dataframe.shape[1]
)
)
raise IndexError(msg)
return dataframe.iloc[:, list(index_list)]
else:
return dataframe

@staticmethod
def _create_condition(names, data_index, obs_keys, index_lists):
conditions = []
for obs_key, index_list in zip(obs_keys, index_lists):
if index_list is not None:
index_cond = [data_index == index for index in index_list]
index_cond = np.logical_or.reduce(index_cond)
conditions.append(np.logical_and(index_cond, (names == obs_key)))
return np.logical_or.reduce(conditions)
return self
names = self.columns.get_level_values(0)
data_index = self.columns.get_level_values("data_index")
cond = _create_condition(names, data_index, obs_keys, index_lists)
return self.iloc[:, cond]


def _create_condition(names, data_index, obs_keys, index_lists):
conditions = []
for obs_key, index_list in zip(obs_keys, index_lists):
if index_list is not None:
index_cond = [data_index == index for index in index_list]
index_cond = np.logical_or.reduce(index_cond)
conditions.append(np.logical_and(index_cond, (names == obs_key)))
return np.logical_or.reduce(conditions)


def _get_data(facade, observation_keys, load_data=True):
"""
Adds simulated and observed data and returns a dataframe where ensamble
members will have a data key, observed data will be named OBS and
observed standard deviation will be named STD.
"""
case_name = facade.get_current_case_name()

# Because several observations can be linked to the same response we create
# a grouping to avoid reading the same response for each of the corresponding
# observations, as that is quite slow.
key_map = defaultdict(list)
for key in observation_keys:
data_key = facade.get_data_key_for_obs_key(key)
key_map[data_key].append(key)

measured_data = []

for obs_keys in key_map.values():
obs_types = [
facade.get_impl_type_name_for_obs_key(key) for key in obs_keys
]
assert len(set(obs_types)) == 1, (
f"\nMore than one observation type found for obs keys: {obs_keys}"
)
observation_type = obs_types[0]
data_loader = loader.data_loader_factory(observation_type)
measured_data.append(data_loader(facade, obs_keys, case_name, load_data))

return pd.concat(measured_data, axis=1)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def package_files(directory):
"cloudevents",
"console-progressbar==1.1.2",
"decorator",
"deprecation",
"equinor-libres >= 8.0.0b0",
"fastapi",
"jinja2",
Expand Down

0 comments on commit a1094cd

Please sign in to comment.