Skip to content

Commit

Permalink
Generalize file splitting for output writers (#3515)
Browse files Browse the repository at this point in the history
* Generalize file splitting for JLD2OutputWriter so that alternative criterion to file size may be used

* Update src/OutputWriters/jld2_output_writer.jl

Co-authored-by: Navid C. Constantinou <[email protected]>

* Export FileSizeLimit and update JLD2OutputWriter test

* implementing file splitting in netcdf

* Properly export FileSizeLimit

* fix handeling of path writer.filepath with the file_splitting

* add support to file splitting by size in netCDFs

* add support to file splitting by size in netCDFs

* update warning  to properly print variable.

Co-authored-by: Gregory L. Wagner <[email protected]>

* return to the use of FileSizeLimit(200KiB) in the test to make it easier to read

* update netcdf to match jld2, and add return in update_file_splitting_schedule

* fix tests filesize tests

* Apply suggestions from code review

* Update test_jld2_output_writer.jl

* fix show for NetCDFOutputWriter

* fix doctests

* Update src/OutputWriters/netcdf_output_writer.jl

Co-authored-by: Gregory L. Wagner <[email protected]>

* Update src/OutputWriters/jld2_output_writer.jl

Co-authored-by: Gregory L. Wagner <[email protected]>

* fix doctests

* fix doctests

* fix doctest

* cleanup unecessary imports

* fix doctest

* fix doctests

* fix doctests

---------

Co-authored-by: Navid C. Constantinou <[email protected]>
Co-authored-by: josuemtzmo <[email protected]>
  • Loading branch information
3 people committed Mar 24, 2024
1 parent adb728c commit 1104449
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 76 deletions.
1 change: 0 additions & 1 deletion benchmark/benchmark_multi_GPU.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ using Oceananigans.Models.HydrostaticFreeSurfaceModels:
ExplicitFreeSurface

using Oceananigans.Utils: prettytime, hours
using Oceananigans.OutputWriters: JLD2OutputWriter, TimeInterval, IterationInterval

using Oceananigans.MultiRegion
using Oceananigans.TurbulenceClosures: VerticallyImplicitTimeDiscretization
Expand Down
20 changes: 15 additions & 5 deletions docs/src/model_setup/output_writers.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ NetCDFOutputWriter scheduled on TimeInterval(1 minute):
├── dimensions: zC(16), zF(17), xC(16), yF(16), xF(16), yC(16), time(0)
├── 2 outputs: (c, u)
└── array type: Array{Float64}
├── file_splitting: NoFileSplitting
└── file size: 14.8 KiB
```

```jldoctest netcdf1
Expand All @@ -83,6 +85,8 @@ NetCDFOutputWriter scheduled on TimeInterval(1 minute):
├── dimensions: zC(1), zF(1), xC(16), yF(16), xF(16), yC(16), time(0)
├── 2 outputs: (c, u)
└── array type: Array{Float64}
├── file_splitting: NoFileSplitting
└── file size: 14.8 KiB
```

```jldoctest netcdf1
Expand All @@ -98,6 +102,8 @@ NetCDFOutputWriter scheduled on TimeInterval(1 minute):
├── dimensions: zC(16), zF(17), xC(1), yF(1), xF(1), yC(1), time(0)
├── 2 outputs: (c, u) averaged on AveragedTimeInterval(window=20 seconds, stride=1, interval=1 minute)
└── array type: Array{Float64}
├── file_splitting: NoFileSplitting
└── file size: 17.6 KiB
```

`NetCDFOutputWriter` also accepts output functions that write scalars and arrays to disk,
Expand Down Expand Up @@ -148,6 +154,8 @@ NetCDFOutputWriter scheduled on IterationInterval(1):
├── dimensions: zC(16), zF(17), xC(16), yF(16), xF(16), yC(16), time(0)
├── 3 outputs: (profile, slice, scalar)
└── array type: Array{Float64}
├── file_splitting: NoFileSplitting
└── file size: 17.8 KiB
```

See [`NetCDFOutputWriter`](@ref) for more information.
Expand Down Expand Up @@ -196,7 +204,8 @@ JLD2OutputWriter scheduled on TimeInterval(20 minutes):
├── 3 outputs: (u, v, w)
├── array type: Array{Float64}
├── including: [:grid, :coriolis, :buoyancy, :closure]
└── max filesize: Inf YiB
├── file_splitting: NoFileSplitting
└── file size: 27.4 KiB
```

and a time- and horizontal-average of tracer `c` every 20 minutes of simulation time
Expand All @@ -213,7 +222,8 @@ JLD2OutputWriter scheduled on TimeInterval(20 minutes):
├── 1 outputs: c averaged on AveragedTimeInterval(window=5 minutes, stride=1, interval=20 minutes)
├── array type: Array{Float64}
├── including: [:grid, :coriolis, :buoyancy, :closure]
└── max filesize: Inf YiB
├── file_splitting: NoFileSplitting
└── file size: 17.5 KiB
```


Expand All @@ -239,7 +249,7 @@ time `interval`. The ``t_i`` specify both the end of the averaging window and th
Building an `AveragedTimeInterval` that averages over a 1 day window, every 4 days,

```jldoctest averaged_time_interval
using Oceananigans.OutputWriters: AveragedTimeInterval
using Oceananigans
using Oceananigans.Units
schedule = AveragedTimeInterval(4days, window=1day)
Expand All @@ -253,7 +263,6 @@ to time-average its outputs before writing them to disk:

```jldoctest averaged_time_interval
using Oceananigans
using Oceananigans.OutputWriters: JLD2OutputWriter
using Oceananigans.Units
model = NonhydrostaticModel(grid=RectilinearGrid(size=(1, 1, 1), extent=(1, 1, 1)))
Expand All @@ -270,5 +279,6 @@ JLD2OutputWriter scheduled on TimeInterval(4 days):
├── 3 outputs: (u, v, w) averaged on AveragedTimeInterval(window=1 day, stride=2, interval=4 days)
├── array type: Array{Float64}
├── including: [:grid, :coriolis, :buoyancy, :closure]
└── max filesize: Inf YiB
├── file_splitting: NoFileSplitting
└── file size: 26.7 KiB
```
2 changes: 1 addition & 1 deletion src/Oceananigans.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ export
# Output writers
NetCDFOutputWriter, JLD2OutputWriter, Checkpointer,
TimeInterval, IterationInterval, AveragedTimeInterval, SpecifiedTimes,
AndSchedule, OrSchedule, written_names,
FileSizeLimit, AndSchedule, OrSchedule, written_names,

# Output readers
FieldTimeSeries, FieldDataset, InMemory, OnDisk,
Expand Down
3 changes: 2 additions & 1 deletion src/OutputWriters/OutputWriters.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module OutputWriters

export
JLD2OutputWriter, NetCDFOutputWriter, written_names,
Checkpointer, WindowedTimeAverage,
Checkpointer, WindowedTimeAverage, FileSizeLimit,
TimeInterval, IterationInterval, WallTimeInterval, AveragedTimeInterval

using CUDA
Expand All @@ -15,6 +15,7 @@ using Oceananigans.Models
using Oceananigans: AbstractOutputWriter
using Oceananigans.Grids: interior_indices
using Oceananigans.Utils: TimeInterval, IterationInterval, WallTimeInterval, instantiate
using Oceananigans.Utils: pretty_filesize

using OffsetArrays

Expand Down
67 changes: 36 additions & 31 deletions src/OutputWriters/jld2_output_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@ using Printf
using JLD2
using Oceananigans.Utils
using Oceananigans.Models
using Oceananigans.Utils: TimeInterval, pretty_filesize, prettykeys
using Oceananigans.Utils: TimeInterval, prettykeys
using Oceananigans.Fields: boundary_conditions, indices

default_included_properties(::NonhydrostaticModel) = [:grid, :coriolis, :buoyancy, :closure]
default_included_properties(::ShallowWaterModel) = [:grid, :coriolis, :closure]
default_included_properties(::HydrostaticFreeSurfaceModel) = [:grid, :coriolis, :buoyancy, :closure]

mutable struct JLD2OutputWriter{O, T, D, IF, IN, KW} <: AbstractOutputWriter
mutable struct JLD2OutputWriter{O, T, D, IF, IN, FS, KW} <: AbstractOutputWriter
filepath :: String
outputs :: O
schedule :: T
array_type :: D
init :: IF
including :: IN
part :: Int
max_filesize :: Float64
file_splitting :: FS
overwrite_existing :: Bool
verbose :: Bool
jld2_kw :: KW
Expand All @@ -32,7 +32,7 @@ ext(::Type{JLD2OutputWriter}) = ".jld2"
indices = (:, :, :),
with_halos = false,
array_type = Array{Float64},
max_filesize = Inf,
file_splitting = NoFileSplitting(),
overwrite_existing = false,
init = noinit,
including = [:grid, :coriolis, :buoyancy, :closure],
Expand All @@ -54,7 +54,7 @@ Keyword arguments
## Filenaming
- `filename` (required): Descriptive filename. `".jld2"` is appended to `filename` in the file path
if `filename` does not end in `".jld2"`.
if `filename` does not end in `".jld2"`.
- `dir`: Directory to save output to. Default: `"."` (current working directory).
Expand All @@ -80,10 +80,12 @@ Keyword arguments
## File management
- `max_filesize`: The writer will stop writing to the output file once the file size exceeds `max_filesize`,
and write to a new one with a consistent naming scheme ending in `part1`, `part2`, etc.
Defaults to `Inf`.
- `file_splitting`: Schedule for splitting the output file. The new files will be suffixed with
`_part1`, `_part2`, etc. For example `file_splitting = FileSizeLimit(sz)` will
split the output file when its size exceeds `sz`. Another example is
`file_splitting = TimeInterval(30days)`, which will split files every 30 days of
simulation time. The default incurs no splitting (`NoFileSplitting()`).
- `overwrite_existing`: Remove existing files if their filenames conflict.
Default: `false`.
Expand All @@ -100,7 +102,7 @@ Keyword arguments
- `verbose`: Log what the output writer is doing with statistics on compute/write times and file sizes.
Default: `false`.
- `part`: The starting part number used if `max_filesize` is finite.
- `part`: The starting part number used when file splitting.
Default: 1.
- `jld2_kw`: Dict of kwargs to be passed to `jldopen` when data is written.
Expand Down Expand Up @@ -138,7 +140,8 @@ JLD2OutputWriter scheduled on TimeInterval(20 minutes):
├── 3 outputs: (u, v, w)
├── array type: Array{Float64}
├── including: [:grid, :coriolis, :buoyancy, :closure]
└── max filesize: Inf YiB
├── file_splitting: NoFileSplitting
└── file size: 27.4 KiB
```
and a time- and horizontal-average of tracer ``c`` every 20 minutes of simulation time
Expand All @@ -155,15 +158,16 @@ JLD2OutputWriter scheduled on TimeInterval(20 minutes):
├── 1 outputs: c averaged on AveragedTimeInterval(window=5 minutes, stride=1, interval=20 minutes)
├── array type: Array{Float64}
├── including: [:grid, :coriolis, :buoyancy, :closure]
└── max filesize: Inf YiB
├── file_splitting: NoFileSplitting
└── file size: 17.5 KiB
```
"""
function JLD2OutputWriter(model, outputs; filename, schedule,
dir = ".",
indices = (:, :, :),
with_halos = false,
array_type = Array{Float64},
max_filesize = Inf,
file_splitting = NoFileSplitting(),
overwrite_existing = false,
init = noinit,
including = default_included_properties(model),
Expand All @@ -174,18 +178,19 @@ function JLD2OutputWriter(model, outputs; filename, schedule,
mkpath(dir)
filename = auto_extension(filename, ".jld2")
filepath = joinpath(dir, filename)
update_file_splitting_schedule!(file_splitting, filepath)
overwrite_existing && isfile(filepath) && rm(filepath, force=true)

outputs = NamedTuple(Symbol(name) => construct_output(outputs[name], model.grid, indices, with_halos)
for name in keys(outputs))

# Convert each output to WindowedTimeAverage if schedule::AveragedTimeWindow is specified
schedule, outputs = time_average_outputs(schedule, outputs, model)

initialize_jld2_file!(filepath, init, jld2_kw, including, outputs, model)

return JLD2OutputWriter(filepath, outputs, schedule, array_type, init,
including, part, max_filesize, overwrite_existing, verbose, jld2_kw)
including, part, file_splitting, overwrite_existing, verbose, jld2_kw)
end

function initialize_jld2_file!(filepath, init, jld2_kw, including, outputs, model)
Expand Down Expand Up @@ -247,18 +252,17 @@ end
function write_output!(writer::JLD2OutputWriter, model)

verbose = writer.verbose
path = writer.filepath
current_iteration = model.clock.iteration

# Some logic to handle writing to existing files
if iteration_exists(path, current_iteration)
if iteration_exists(writer.filepath, current_iteration)

if writer.overwrite_existing
# Something went wrong, so we remove the file and re-initialize it.
rm(path, force=true)
rm(writer.filepath, force=true)
initialize_jld2_file!(writer, model)
else # nothing we can do since we were asked not to overwrite_existing, so we skip output writing
@warn "Iteration $current_iteration was found in $path. Skipping output writing (for now...)"
@warn "Iteration $current_iteration was found in $(writer.filepath). Skipping output writing (for now...)"
end

else # ok let's do this
Expand All @@ -271,16 +275,15 @@ function write_output!(writer::JLD2OutputWriter, model)

verbose && @info "Fetching time: $(prettytime(tc))"

# Start a new file if the filesize exceeds max_filesize
filesize(path) >= writer.max_filesize && start_next_file(model, writer)
path = writer.filepath # we might have a new path...

# Start a new file if the file_splitting(model) is true
writer.file_splitting(model) && start_next_file(model, writer)
update_file_splitting_schedule!(writer.file_splitting, writer.filepath)
# Write output from `data`
verbose && @info "Writing JLD2 output $(keys(writer.outputs)) to $path..."

start_time, old_filesize = time_ns(), filesize(path)
jld2output!(path, model.clock.iteration, model.clock.time, data, writer.jld2_kw)
end_time, new_filesize = time_ns(), filesize(path)
start_time, old_filesize = time_ns(), filesize(writer.filepath)
jld2output!(writer.filepath, model.clock.iteration, model.clock.time, data, writer.jld2_kw)
end_time, new_filesize = time_ns(), filesize(writer.filepath)

verbose && @info @sprintf("Writing done: time=%s, size=%s, Δsize=%s",
prettytime((end_time - start_time) / 1e9),
Expand Down Expand Up @@ -311,9 +314,10 @@ end

function start_next_file(model, writer::JLD2OutputWriter)
verbose = writer.verbose
sz = filesize(writer.filepath)

verbose && @info begin
"Filesize $(pretty_filesize(sz)) has exceeded maximum file size $(pretty_filesize(writer.max_filesize))."
schedule_type = summary(writer.file_splitting)
"Splitting output because $(schedule_type) is activated."
end

if writer.part == 1
Expand All @@ -329,7 +333,7 @@ function start_next_file(model, writer::JLD2OutputWriter)
verbose && @info "Now writing to: $(writer.filepath)"

initialize_jld2_file!(writer, model)

return nothing
end

Expand All @@ -346,5 +350,6 @@ function Base.show(io::IO, ow::JLD2OutputWriter)
"├── $Noutputs outputs: ", prettykeys(ow.outputs), show_averaging_schedule(averaging_schedule), "\n",
"├── array type: ", show_array_type(ow.array_type), "\n",
"├── including: ", ow.including, "\n",
"└── max filesize: ", pretty_filesize(ow.max_filesize))
"├── file_splitting: ", summary(ow.file_splitting), "\n",
"└── file size: ", pretty_filesize(filesize(ow.filepath)))
end
Loading

0 comments on commit 1104449

Please sign in to comment.