Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete redesign of "Observations" object enabling introduction of minibatching #384

Merged
merged 32 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
586c48a
update getters for obs and obs_noise_cov
odunbar Jun 17, 2024
1c02b0e
remove arg from etki constructor
odunbar Jun 25, 2024
50c018b
commas to semicolons
odunbar Jun 26, 2024
5c3e30c
performance improvements and bug-fix, works well with new Observation…
odunbar Jun 26, 2024
2358431
add comment
odunbar Jun 26, 2024
de2a431
format
odunbar Jun 26, 2024
73bdd0d
adds average update time
odunbar Jun 26, 2024
57cbd6d
new docs page
odunbar Jun 26, 2024
1c493bb
example block fixed
odunbar Jun 26, 2024
68507bf
new blocks
odunbar Jun 26, 2024
a5a137f
new blocks
odunbar Jun 26, 2024
0b7276d
add kwargs...
odunbar Jun 27, 2024
81952ff
format
odunbar Jun 27, 2024
460e797
new API docstrings
odunbar Jun 27, 2024
58a5b03
format api
odunbar Jun 27, 2024
cc33614
reduce test time, and convert all UniformScalings to Diagonals
odunbar Jul 11, 2024
a2e293d
format
odunbar Jul 11, 2024
c13462a
docs subtitle and move up observations
odunbar Jul 11, 2024
c74716c
rm "noise"
odunbar Jul 15, 2024
f1d90f2
clearer N_obs in constructor
odunbar Jul 15, 2024
c8983b4
note on identifiers
odunbar Jul 15, 2024
c124e3d
compatible learning rate schedulers
odunbar Jul 17, 2024
09962fd
remove build of inverse covariance to accelerate ETKI back to linear …
odunbar Jul 17, 2024
8069dbe
add timer check to ETKI
odunbar Jul 17, 2024
d769eff
format
odunbar Jul 17, 2024
a225c6b
another constructor option
odunbar Jul 26, 2024
8e23b32
addresses review comments
odunbar Jul 26, 2024
e3a591c
cleaned up ETKI look, by allocating buffers first and removing logic
odunbar Jul 26, 2024
d3d2a69
add comment
odunbar Jul 26, 2024
c33c673
typo docs
odunbar Jul 26, 2024
9556020
neaten format
odunbar Jul 26, 2024
3327533
Merge branch 'main' into orad/minibatch
odunbar Jul 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ pages = [
"Unscented Kalman Inversion" => "unscented_kalman_inversion.md",
"Learning rate schedulers" => "learning_rate_scheduler.md",
"Prior distributions" => "parameter_distributions.md",
"Internal data representation" => "internal_data_representation.md",
"Observations and Minibatching" => "observations.md",
"Localization and SEC" => "localization.md",
"Inflation" => "inflation.md",
"Parallelism and HPC" => "parallel_hpc.md",
"Observations" => "observations.md",
"Internal data representation" => "internal_data_representation.md",
"Troubleshooting" => "troubleshooting.md",
"API" => api,
"Contributing" => "contributing.md",
Expand Down
42 changes: 40 additions & 2 deletions docs/src/API/Observations.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,48 @@
# Observations

```@meta
CurrentModule = EnsembleKalmanProcesses.Observations
CurrentModule = EnsembleKalmanProcesses
```

## Observation
```@docs
Observation
get_samples
get_covs
get_inv_covs
get_names
get_indices
combine_observations
get_obs(o::OB) where {OB <: Observation}
get_obs_noise_cov(o::OB) where {OB <: Observation}
get_obs_noise_cov_inv(o::OB) where {OB <: Observation}
```
## Minibatcher
```@docs
FixedMinibatcher
no_minibatcher
create_new_epoch!(m::FM) where {FM <: FixedMinibatcher}
get_minibatches(m::FM) where {FM <: FixedMinibatcher}
get_method(m::FM) where {FM <: FixedMinibatcher}
get_rng(m::FM) where {FM <: FixedMinibatcher}
```

```@docs
RandomFixedSizeMinibatcher
get_minibatch_size(m::RFSM) where {RFSM <: RandomFixedSizeMinibatcher}
get_method(m::RFSM) where {RFSM <: RandomFixedSizeMinibatcher}
get_rng(m::RFSM) where {RFSM <: RandomFixedSizeMinibatcher}
get_minibatches(m::RFSM) where {RFSM <: RandomFixedSizeMinibatcher}
```
## ObservationSeries
```@docs
ObservationSeries
get_observations(os::OS) where {OS <: ObservationSeries}
get_minibatches(os::OS) where {OS <: ObservationSeries}
get_current_minibatch_index(os::OS) where {OS <: ObservationSeries}
get_minibatcher(os::OS) where {OS <: ObservationSeries}
update_minibatch!(os::OS) where {OS <: ObservationSeries}
get_current_minibatch(os::OS) where {OS <: ObservationSeries}
get_obs(os::OS) where {OS <: ObservationSeries}
get_obs_noise_cov(os::OS) where {OS <: ObservationSeries}
get_obs_noise_cov_inv(os::OS) where {OS <: ObservationSeries}
```
4 changes: 2 additions & 2 deletions docs/src/ensemble_kalman_inversion.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ The major disadvantage of ETKI is that it cannot be used with localization or sa

## Using ETKI

An ETKI struct can be created using the `EnsembleKalmanProcess` constructor by specifying the `TransformInversion` process type, with ``\Gamma^{-1}`` passed as an argument:
An ETKI struct can be created using the `EnsembleKalmanProcess` constructor by specifying the `TransformInversion` process type:

```julia
using EnsembleKalmanProcesses
Expand All @@ -190,7 +190,7 @@ J = 50 # number of ensemble members
initial_ensemble = construct_initial_ensemble(prior, J) # Initialize ensemble from prior

ekiobj = EnsembleKalmanProcess(initial_ensemble, y, obs_noise_cov,
TransformInversion(inv(obs_noise_cov)))
TransformInversion())
```

The rest of the inversion process is the same as for regular EKI.
Expand Down
2 changes: 1 addition & 1 deletion docs/src/examples/Cloudy_example.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ for i in 1:n_samples
y_t[:, i] = G_t .+ rand(MvNormal(μ, Γy))
end

truth = Observations.Observation(y_t, Γy, data_names)
truth = Observation(y_t, Γy, data_names)
```


Expand Down
3 changes: 3 additions & 0 deletions docs/src/glossary.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ on GitHub.
| Observation map | ``\mathcal{H}`` | `H` |
| Prior covariance (unconstrained space) | ``\Gamma_{\theta}`` | `prior_cov` |
| Prior mean (unconstrained space) | ``m_\theta`` | `prior_mean` |

!!! note "On batching"
When observations or parameters are being batched, then their size (e.g., `N_obs` or `N_par`) will refer to the size of elements of each batch, summed over the batch. For example, when calibrating 100 observations of a 15-dimensional output space, but using a minibatching procedure with batch size 5, then `N_obs = 75`.
183 changes: 167 additions & 16 deletions docs/src/observations.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,174 @@
# Observations

The `Observations` object is used to store the truth for convenience of the user. The ingredients are
1. Samples of the data `Vector{Vector{Float}}` or `Array{Float, 2}`. If provided as a 2D array, the samples must be provided as *columns*. They are stored internally as `Vector{Vector{Float}}`
2. An optional covariance matrix can be provided.
3. The names of the data in this object as a `String` or `Vector{String}`
The Observations object facilitates convenient storing, grouping and minibatching over observations.

The empirical mean is calculated automatically.
If a covariance matrix is not provided, then the empirical covariance is also calculated automatically.
## The key objects
1. The `Observation` is a container for an observed variables ("samples"), their noise covariances ("covariances"), and names ("names"). They are easily stackable to help build larger heterogenous observations
2. The `Minibatcher` facilitate data streaming (minibatching), where a user can submit large group of observations, that are then batched up and looped over in epochs.
3. The `ObservationSeries` contains the list of `Observation`s and `Minibatcher` and the utilities to get the current batch etc.

## A simple example:
!!! note "I usually just pass in a vector of data and a covariance to EKP"
Users can indeed set up an experiment with just one data sample and covariance matrix for the noise. However internally these are still stored as an `ObservationSeries` with a special minibatcher that does nothing (created by `no_minibatcher(size)`).

Here is a typical construction of the object:
```julia
μ = zeros(5)
Γy = rand(5, 5)
Γy = Γy' * Γy
yt = rand(MvNormal(μ, Γy), 100) # generate 100 samples
name = "zero-mean mvnormal"
## Recommended constructor: A single (stacked) observation

Here the user has data for two independent variables: the five-dimensional `y` and the eight-dimensional `z`. The observations of `y` are all independent, while the observations of `z` have some structure.
odunbar marked this conversation as resolved.
Show resolved Hide resolved

We recommend users build an `Observation` using the `Dict` constructor and make use of the `combine_observations()` utility.
```@example ex1
using EnsembleKalmanProcesses # for `Observation`
using LinearAlgebra # for `I`, `Tridiagonal`

# observe variable y with some diagonal noise
odunbar marked this conversation as resolved.
Show resolved Hide resolved
y = ones(5)
odunbar marked this conversation as resolved.
Show resolved Hide resolved
cov_y = 0.01*I

# observe variable z with some tri-diagonal noise
zdim = 8
z = zeros(zdim)
cov_z = Tridiagonal(0.1*ones(zdim-1), ones(zdim), 0.1*ones(zdim-1))

y_obs = Observation(
Dict(
"samples" => y,
"covariances" => cov_y,
"names" => "y",
),
)

z_obs = Observation(
Dict(
"samples" => z,
"covariances" => cov_z,
"names" => "z",
),
)

full_obs = combine_observations([y_obs,z_obs]) # conveniently stacks the observations
```

```@example ex1
# getting information out
get_obs(full_obs) # returns [y,z]
```
```@example ex1
get_obs_noise_cov(full_obs) # returns block-diagonal matrix with blocks [cov_y 0; 0 cov_z]
```
getters `get_*` can be used for the internally stored information too including:
``` @example ex1
get_names(full_obs)
odunbar marked this conversation as resolved.
Show resolved Hide resolved
```
There are some other fields stored such as indices of the `y` and `z` components
```@example ex1
get_indices(full_obs)
```

## Recommended constructor: Many stacked observations

Imagine the user has 100 independent data samples for two independent variables above, where the `k`th `y` sample is = `k*ones(5)` for each `k=1:100`.
odunbar marked this conversation as resolved.
Show resolved Hide resolved
Rather than stacking all the data together at once (forming a full system of size `100*(8+5)` to update at each step) instead the user wishes to stream the data and do updates with random batches of 5 observations at each iteration.

!!! note "Why would I choose to minibatch?"
The memory- and time-scaling of many EKP methods is worse-than-linear in the observation dimension, therefore there is often large computational benefit to minibatch EKP updates. Such costs must be weighed against the cost of additional forward map evaluations needed to minibatching over one full epoch.
eviatarbach marked this conversation as resolved.
Show resolved Hide resolved

```@setup ex2
using EnsembleKalmanProcesses
using LinearAlgebra

hundred_full_obs = []
for k = 1:100
odunbar marked this conversation as resolved.
Show resolved Hide resolved
y = k*ones(5)
cov_y = 0.01*I(5)

z = zeros(8)
cov_z = Tridiagonal(0.1*ones(7), ones(8), 0.1*ones(7))

y_obs = Observation(
Dict(
"samples" => y,
"covariances" => cov_y,
"names" => "y_$k",
),
)

z_obs = Observation(
Dict(
"samples" => z,
"covariances" => cov_z,
"names" => "z_$k",
),
)
push!(hundred_full_obs, combine_observations([y_obs,z_obs]))
end
T = promote_type((typeof(h) for h in hundred_full_obs)...)
odunbar marked this conversation as resolved.
Show resolved Hide resolved
hundred_full_obs = [convert(T, h) for h in hundred_full_obs]
```

```@example ex2
# given a vector of 100 `Observation`s called hundred_full_obs, where `y_k = k*y`
using EnsembleKalmanProcesses # for `RandomFixedSizeMinibatcher`, `ObservationSeries`, `Minibatcher`

minibatcher = RandomFixedSizeMinibatcher(5) # batches the epoch of size 100, into batches of size 5

true_data = Observations.Observation(yt, Γy, name)
observation_series = ObservationSeries(
Dict(
"observations" => hundred_full_obs,
"minibatcher" => minibatcher,
),
)
```
Currently, the data is retrieved by accessing the stored variables, e.g the fifth data sample is given by `truth_data.samples[5]`, or the covariance matrix by `truth_data.cov`.

```@example ex2
# some example methods to get information out at the current minibatch:
get_current_minibatch(observation_series) # returns [i₁, ..., i₅], the current minibatch subset of indices 1:100
```

```@example ex2
get_obs(observation_series) # returns [yi₁, zi₁, ..., yi₅, zi₅], the data sample for the current minibatch
```

```@example ex2
get_obs_noise_cov(observation_series) # returns block-diagonal matrix with blocks [cov_yi₁ 0 ... 0 ; 0 cov_zi₁ 0 ... 0; ... ; 0 ... 0 cov_yi₅ 0; 0 ... 0 cov_zi₅]
```

minibatches are updated internally to the `update_ensemble!(ekp,...)` step via a call to
```@example ex2
update_minibatch!(observation_series)
get_current_minibatch(observation_series)
```



## Minibatchers

Minibatchers are modular and must be derived from the `Minibatcher` `abstract type`. They contain a method `create_new_epoch!(minibatcher,args...;kwargs)` that creates a sampling of an epoch of data. For example, if we have 100 data observations, the epoch is `1:100`, and one possible minibatching is a random partitioning of `1:100` into a batch-size (e.g., 5) leading to 20 minibatches.

Some of the implemented Minibatchers
- `FixedMinibatcher(given_batches, "order")`, (default `method = "order"`), minibatches are fixed and run through in order for all epochs
- `FixedMinibatcher(given_batches, "random")`, minibatches are fixed, but are run through in a randomly chosen order in each epoch
- `no_minibatcher(size)`, creates a `FixedMinibatcher` with just one batch which is the epoch (effectively no minibatching)
- `RandomFixedSizeMinibatcher(minibatch_size, "trim")`, (default `method = "trim"`) creates minibatches of size `minibatch_size` by randomly sampling the epoch, if the minibatch size does not divide into the number of samples it will ignore the remainder (and thus preserving a constant batch size)
- `RandomFixedSizeMinibatcher(minibatch_size, "extend")`, creates minibatches of size `minibatch_size` by randomly sampling the epoch, if the minibatch size does not divide into the number of samples it will include the remainder in the final batch (and thus will cover the entirety of the data, with a larger final batch)

## Identifiers

One can additionally provide a vector of `names` to name each `Observation` in the `ObservationSeries` by giving using the `Dict` entry `"names" => names`.

To think about the differences between the identifiers for `Observation` and `ObservationSeries` consider an application of observing the average state of a dynamical system over 100 time windows. The time windows will be batched over during the calibration.

The compiled information is given in the object:
```julia
yz_observation_series::ObservationSeries
```
As this contains many time windows, setting the names of the `ObservationSeries` objects to index the time window is a sensible identifier, for example,
```julia
get_names(yz_observation_series)
> ["window_1", "window_2", ..., "window_100"]
```
The individual `Observation`s should refer only to the state being measured, sosuitable identifiers might be, for example,
odunbar marked this conversation as resolved.
Show resolved Hide resolved
```julia
obs = get_observations(yz_observation_series)[1] # get first observation in the series
get_names(obs)
> ["y_window_average", "z_window_average"]
```

19 changes: 4 additions & 15 deletions examples/AerosolActivation/aerosol_activation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -139,40 +139,29 @@ nothing # hide
# calibrate is also used to generate observations.
# We use the total number and mass of activated aerosol particles as
# our observational data.
observation_data_names = ["N_act", "M_act"];
observation_data_names = ["N_act_and_M_act"];
nothing # hide

# We generate artificial truth samples based on the default values
# of parameters we are calibrating.
n_samples = 10
G_t = run_activation_model(molar_mass_true, osmotic_coeff_true)
y_t = zeros(length(G_t), n_samples)

Γy = convert(Array, LinearAlgebra.Diagonal([0.01 * G_t[1], 0.01 * G_t[2]]))
μ = zeros(length(G_t));
nothing # hide

# And add noise to the generated truth sample.
for i in 1:n_samples
y_t[:, i] = G_t .+ rand(Distributions.MvNormal(μ, Γy))
end

truth_array = EKP.Observations.Observation(y_t, Γy, observation_data_names)
nothing # hide
y_t = G_t .+ rand(Distributions.MvNormal(μ, Γy))

# One could try for the truth to be a mean of the generated array.
# Or do the calibration for all individual truth samples and then
# compute the mean of calibrated parameters.
# For now we are just taking one truth array member.
truth_sample = truth_array.samples[1];
observation = EKP.Observation(Dict("samples" => y_t, "covariances" => Γy, "names" => observation_data_names))
nothing # hide

# We use 50 ensemble members and do 10 iterations.
N_ens = 50
N_iter = 10

initial_par = EKP.construct_initial_ensemble(rng, priors, N_ens)
ekiobj = EKP.EnsembleKalmanProcess(initial_par, truth_sample, truth_array.obs_noise_cov, EKP.Inversion())
ekiobj = EKP.EnsembleKalmanProcess(initial_par, observation, EKP.Inversion())
nothing # hide

# Finally, we can run the Ensemble Kalman Process calibration.
Expand Down
1 change: 0 additions & 1 deletion examples/ClimateMachine/init_calibration.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ using Distributions
using ArgParse
# Import EnsembleKalmanProcesses modules
using EnsembleKalmanProcesses
using EnsembleKalmanProcesses.Observations
using EnsembleKalmanProcesses.ParameterDistributions
include(joinpath(@__DIR__, "helper_funcs.jl"))

Expand Down
1 change: 0 additions & 1 deletion examples/ClimateMachine/sstep_calibration.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ using NCDatasets
using LinearAlgebra
# Import EnsembleKalmanProcesses modules
using EnsembleKalmanProcesses
using EnsembleKalmanProcesses.Observations
using EnsembleKalmanProcesses.ParameterDistributions

include(joinpath(@__DIR__, "helper_funcs.jl"))
Expand Down
14 changes: 3 additions & 11 deletions examples/Cloudy/Cloudy_example_eki.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ using Random

# Import Calibrate-Emulate-Sample modules
using EnsembleKalmanProcesses
using EnsembleKalmanProcesses.Observations
using EnsembleKalmanProcesses.ParameterDistributions
using EnsembleKalmanProcesses.DataContainers

Expand Down Expand Up @@ -71,7 +70,7 @@ priors = combine_distributions([prior_N0, prior_θ, prior_k])
### Define the data from which we want to learn the parameters
###

data_names = ["M0", "M1", "M2"]
data_names = ["M0_M1_M2"]
moments = [0.0, 1.0, 2.0]
n_moments = length(moments)

Expand Down Expand Up @@ -111,8 +110,7 @@ for i in 1:n_samples
y_t[:, i] = G_t .+ rand(MvNormal(μ, Γy))
end

truth = Observations.Observation(y_t, Γy, data_names)
truth_sample = truth.mean
truth = Observation(Dict("samples" => vec(mean(y_t, dims = 2)), "covariances" => Γy, "names" => data_names))


###
Expand All @@ -124,13 +122,7 @@ N_iter = 8 # number of EKI iterations
# initial parameters: N_par x N_ens

initial_par = construct_initial_ensemble(rng, priors, N_ens)
ekiobj = EnsembleKalmanProcess(
initial_par,
truth_sample,
truth.obs_noise_cov,
Inversion(),
timestepper = DefaultTimestepper(0.1),
)
ekiobj = EnsembleKalmanProcess(initial_par, truth, Inversion(), timestepper = DefaultTimestepper(0.1))

# Initialize a ParticleDistribution with dummy parameters. The parameters
# will then be set within `run_dyn_model`
Expand Down
Loading
Loading