Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize file splitting for output writers #3515

Merged
merged 29 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e06d78e
Generalize file splitting for JLD2OutputWriter so that alternative cr…
glwagner Mar 15, 2024
b1ad9b6
Update src/OutputWriters/jld2_output_writer.jl
glwagner Mar 16, 2024
183e729
Export FileSizeLimit and update JLD2OutputWriter test
glwagner Mar 16, 2024
0f5fedb
implementing file splitting in netcdf
josuemtzmo Mar 16, 2024
9276ba6
Properly export FileSizeLimit
glwagner Mar 16, 2024
6c6e07e
fix handeling of path writer.filepath with the file_splitting
josuemtzmo Mar 17, 2024
ab7fec6
merge with glw/generalized-file-splitting
josuemtzmo Mar 17, 2024
2937dc1
add support to file splitting by size in netCDFs
josuemtzmo Mar 17, 2024
4f9af7e
add support to file splitting by size in netCDFs
josuemtzmo Mar 17, 2024
73a94ef
update warning to properly print variable.
josuemtzmo Mar 18, 2024
cd30f3e
return to the use of FileSizeLimit(200KiB) in the test to make it eas…
josuemtzmo Mar 18, 2024
2f0e4f6
update netcdf to match jld2, and add return in update_file_splitting_…
josuemtzmo Mar 18, 2024
09c4abb
fix tests filesize tests
josuemtzmo Mar 18, 2024
db94858
Apply suggestions from code review
navidcy Mar 22, 2024
14ce9a8
Merge branch 'main' into glw/generalized-file-splitting
navidcy Mar 22, 2024
4804fc2
Update test_jld2_output_writer.jl
navidcy Mar 22, 2024
3a846e5
Merge branch 'main' into glw/generalized-file-splitting
josuemtzmo Mar 22, 2024
4c1db46
fix show for NetCDFOutputWriter
navidcy Mar 23, 2024
f9b082f
fix doctests
navidcy Mar 23, 2024
527ac31
Update src/OutputWriters/netcdf_output_writer.jl
navidcy Mar 23, 2024
78998bc
Update src/OutputWriters/jld2_output_writer.jl
navidcy Mar 23, 2024
60de913
fix doctests
navidcy Mar 23, 2024
fc49fb7
Merge branch 'glw/generalized-file-splitting' of github.com:CliMA/Oce…
navidcy Mar 23, 2024
d8233b0
fix doctests
navidcy Mar 23, 2024
28cc203
fix doctest
navidcy Mar 24, 2024
97fa9bb
cleanup unecessary imports
navidcy Mar 24, 2024
71dab3a
fix doctest
navidcy Mar 24, 2024
4c1cb92
fix doctests
navidcy Mar 24, 2024
6c0e974
fix doctests
navidcy Mar 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Oceananigans.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ export
# Output writers
NetCDFOutputWriter, JLD2OutputWriter, Checkpointer,
TimeInterval, IterationInterval, AveragedTimeInterval, SpecifiedTimes,
AndSchedule, OrSchedule, written_names,
FileSizeLimit, AndSchedule, OrSchedule, written_names,

# Output readers
FieldTimeSeries, FieldDataset, InMemory, OnDisk,
Expand Down
3 changes: 2 additions & 1 deletion src/OutputWriters/OutputWriters.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module OutputWriters

export
JLD2OutputWriter, NetCDFOutputWriter, written_names,
Checkpointer, WindowedTimeAverage,
Checkpointer, WindowedTimeAverage, FileSizeLimit,
TimeInterval, IterationInterval, WallTimeInterval, AveragedTimeInterval

using CUDA
Expand All @@ -15,6 +15,7 @@ using Oceananigans.Models
using Oceananigans: AbstractOutputWriter
using Oceananigans.Grids: interior_indices
using Oceananigans.Utils: TimeInterval, IterationInterval, WallTimeInterval, instantiate
using Oceananigans.Utils: pretty_filesize

using OffsetArrays

Expand Down
54 changes: 29 additions & 25 deletions src/OutputWriters/jld2_output_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@ using Printf
using JLD2
using Oceananigans.Utils
using Oceananigans.Models
using Oceananigans.Utils: TimeInterval, pretty_filesize, prettykeys
using Oceananigans.Utils: TimeInterval, prettykeys
using Oceananigans.Fields: boundary_conditions, indices

default_included_properties(::NonhydrostaticModel) = [:grid, :coriolis, :buoyancy, :closure]
default_included_properties(::ShallowWaterModel) = [:grid, :coriolis, :closure]
default_included_properties(::HydrostaticFreeSurfaceModel) = [:grid, :coriolis, :buoyancy, :closure]

mutable struct JLD2OutputWriter{O, T, D, IF, IN, KW} <: AbstractOutputWriter
mutable struct JLD2OutputWriter{O, T, D, IF, IN, FS, KW} <: AbstractOutputWriter
filepath :: String
outputs :: O
schedule :: T
array_type :: D
init :: IF
including :: IN
part :: Int
max_filesize :: Float64
file_splitting :: FS
overwrite_existing :: Bool
verbose :: Bool
jld2_kw :: KW
Expand All @@ -32,7 +32,7 @@ ext(::Type{JLD2OutputWriter}) = ".jld2"
indices = (:, :, :),
with_halos = false,
array_type = Array{Float64},
max_filesize = Inf,
file_splitting = NoFileSplitting(),
overwrite_existing = false,
init = noinit,
including = [:grid, :coriolis, :buoyancy, :closure],
Expand Down Expand Up @@ -80,10 +80,12 @@ Keyword arguments

## File management

- `max_filesize`: The writer will stop writing to the output file once the file size exceeds `max_filesize`,
and write to a new one with a consistent naming scheme ending in `part1`, `part2`, etc.
Defaults to `Inf`.

- `file_splitting`: Schedule for splitting the output file. The new files will be suffixed with
`_part1`, `_part2`, etc. For example `file_splitting = FileSizeLimit(sz)` will
split the output file when its size exceeds `sz`. Another example is
`file_splitting = TimeInterval(30days)`, which will split files every 30 days of
simulation time. The default incurs no splitting (`NoFileSplitting()`).

- `overwrite_existing`: Remove existing files if their filenames conflict.
Default: `false`.

Expand All @@ -100,7 +102,7 @@ Keyword arguments
- `verbose`: Log what the output writer is doing with statistics on compute/write times and file sizes.
Default: `false`.

- `part`: The starting part number used if `max_filesize` is finite.
- `part`: The starting part number used when file splitting.
Default: 1.

- `jld2_kw`: Dict of kwargs to be passed to `jldopen` when data is written.
Expand Down Expand Up @@ -163,7 +165,7 @@ function JLD2OutputWriter(model, outputs; filename, schedule,
indices = (:, :, :),
with_halos = false,
array_type = Array{Float64},
max_filesize = Inf,
file_splitting = NoFileSplitting(),
overwrite_existing = false,
init = noinit,
including = default_included_properties(model),
Expand All @@ -174,6 +176,7 @@ function JLD2OutputWriter(model, outputs; filename, schedule,
mkpath(dir)
filename = auto_extension(filename, ".jld2")
filepath = joinpath(dir, filename)
update_file_splitting_schedule!(file_splitting, filepath)
overwrite_existing && isfile(filepath) && rm(filepath, force=true)

outputs = NamedTuple(Symbol(name) => construct_output(outputs[name], model.grid, indices, with_halos)
Expand All @@ -185,7 +188,7 @@ function JLD2OutputWriter(model, outputs; filename, schedule,
initialize_jld2_file!(filepath, init, jld2_kw, including, outputs, model)

return JLD2OutputWriter(filepath, outputs, schedule, array_type, init,
including, part, max_filesize, overwrite_existing, verbose, jld2_kw)
including, part, file_splitting, overwrite_existing, verbose, jld2_kw)
end

function initialize_jld2_file!(filepath, init, jld2_kw, including, outputs, model)
Expand Down Expand Up @@ -247,18 +250,17 @@ end
function write_output!(writer::JLD2OutputWriter, model)

verbose = writer.verbose
path = writer.filepath
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Collaborator

@josuemtzmo josuemtzmo Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because path was used later on in the code, but it wasn't updated. Thus the code crashed while creating the file and when the writer.filepath changed but not the path . The easier fix was to replace all instances of path by the writer.filepath.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that's a good reason!

current_iteration = model.clock.iteration

# Some logic to handle writing to existing files
if iteration_exists(path, current_iteration)
if iteration_exists(writer.filepath, current_iteration)

if writer.overwrite_existing
# Something went wrong, so we remove the file and re-initialize it.
rm(path, force=true)
rm(writer.filepath, force=true)
initialize_jld2_file!(writer, model)
else # nothing we can do since we were asked not to overwrite_existing, so we skip output writing
@warn "Iteration $current_iteration was found in $path. Skipping output writing (for now...)"
@warn "Iteration $current_iteration was found in $(writer.filepath). Skipping output writing (for now...)"
end

else # ok let's do this
Expand All @@ -271,16 +273,15 @@ function write_output!(writer::JLD2OutputWriter, model)

verbose && @info "Fetching time: $(prettytime(tc))"

# Start a new file if the filesize exceeds max_filesize
filesize(path) >= writer.max_filesize && start_next_file(model, writer)
path = writer.filepath # we might have a new path...

# Start a new file if the file_splitting(model) is true
writer.file_splitting(model) && start_next_file(model, writer)
update_file_splitting_schedule!(writer.file_splitting, writer.filepath)
# Write output from `data`
verbose && @info "Writing JLD2 output $(keys(writer.outputs)) to $path..."

start_time, old_filesize = time_ns(), filesize(path)
jld2output!(path, model.clock.iteration, model.clock.time, data, writer.jld2_kw)
end_time, new_filesize = time_ns(), filesize(path)
start_time, old_filesize = time_ns(), filesize(writer.filepath)
jld2output!(writer.filepath, model.clock.iteration, model.clock.time, data, writer.jld2_kw)
end_time, new_filesize = time_ns(), filesize(writer.filepath)

verbose && @info @sprintf("Writing done: time=%s, size=%s, Δsize=%s",
prettytime((end_time - start_time) / 1e9),
Expand Down Expand Up @@ -311,9 +312,10 @@ end

function start_next_file(model, writer::JLD2OutputWriter)
verbose = writer.verbose
sz = filesize(writer.filepath)

verbose && @info begin
"Filesize $(pretty_filesize(sz)) has exceeded maximum file size $(pretty_filesize(writer.max_filesize))."
schedule_type = summary(writer.file_splitting)
"Splitting output because $(schedule_type) has been actuated."
navidcy marked this conversation as resolved.
Show resolved Hide resolved
end

if writer.part == 1
Expand Down Expand Up @@ -346,5 +348,7 @@ function Base.show(io::IO, ow::JLD2OutputWriter)
"├── $Noutputs outputs: ", prettykeys(ow.outputs), show_averaging_schedule(averaging_schedule), "\n",
"├── array type: ", show_array_type(ow.array_type), "\n",
"├── including: ", ow.including, "\n",
"└── max filesize: ", pretty_filesize(ow.max_filesize))
"├── file_splitting: ", summary(ow.file_splitting), "\n",
"└── file size: ", pretty_filesize(filesize(ow.filepath)))
end

47 changes: 30 additions & 17 deletions src/OutputWriters/netcdf_output_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ using Oceananigans.Utils: versioninfo_with_gpu, oceananigans_versioninfo, pretty
using Oceananigans.TimeSteppers: float_or_date_time
using Oceananigans.Fields: reduced_dimensions, reduced_location, location, validate_indices

mutable struct NetCDFOutputWriter{D, O, T, A} <: AbstractOutputWriter
mutable struct NetCDFOutputWriter{D, O, T, A, FS} <: AbstractOutputWriter
filepath :: String
dataset :: D
outputs :: O
Expand All @@ -24,7 +24,7 @@ mutable struct NetCDFOutputWriter{D, O, T, A} <: AbstractOutputWriter
overwrite_existing :: Bool
deflatelevel :: Int
part :: Int
max_filesize :: Float64
file_splitting :: FS
verbose :: Bool
end

Expand Down Expand Up @@ -173,7 +173,7 @@ end
overwrite_existing = false,
deflatelevel = 0,
part = 1,
max_filesize = Inf,
file_splitting = NoFileSplitting(),
verbose = false)

Construct a `NetCDFOutputWriter` that writes `(label, output)` pairs in `outputs` (which should
Expand Down Expand Up @@ -222,15 +222,19 @@ Keyword arguments
and 9 means maximum compression). See [NCDatasets.jl documentation](https://alexander-barth.github.io/NCDatasets.jl/stable/variables/#Creating-a-variable)
for more information.

- `part`: The starting part number used if `max_filesize` is finite.
Default: 1.

- `max_filesize`: The writer will stop writing to the output file once the file size exceeds `max_filesize`,
and write to a new one with a consistent naming scheme ending in `part1`, `part2`, etc.
Defaults to `Inf`.
- `file_splitting`: Schedule for splitting the output file. The new files will be suffixed with
`_part1`, `_part2`, etc. For example `file_splitting = FileSizeLimit(sz)` will
split the output file when its size exceeds `sz`. Another example is
`file_splitting = TimeInterval(30days)`, which will split files every 30 days of
simulation time. The default incurs no splitting (`NoFileSplitting()`).

## Miscellaneous keywords

- `verbose`: Log what the output writer is doing with statistics on compute/write times and file sizes.
Default: `false`.

- `part`: The starting part number used when file splitting.

- `global_attributes`: Dict of model properties to save with every file. Default: `Dict()`.

- `output_attributes`: Dict of attributes to be saved with each field variable (reasonable
Expand Down Expand Up @@ -354,12 +358,14 @@ function NetCDFOutputWriter(model, outputs; filename, schedule,
overwrite_existing = nothing,
deflatelevel = 0,
part = 1,
max_filesize = Inf,
file_splitting = NoFileSplitting(),
verbose = false)
mkpath(dir)
filename = auto_extension(filename, ".nc")
filepath = joinpath(dir, filename)

update_file_splitting_schedule!(file_splitting, filepath)

if isnothing(overwrite_existing)
if isfile(filepath)
overwrite_existing = false
Expand Down Expand Up @@ -415,7 +421,7 @@ function NetCDFOutputWriter(model, outputs; filename, schedule,
overwrite_existing,
deflatelevel,
part,
max_filesize,
file_splitting,
verbose)
end

Expand Down Expand Up @@ -485,9 +491,9 @@ Write output to netcdf file `output_writer.filepath` at specified intervals. Inc
every time an output is written to the file.
"""
function write_output!(ow::NetCDFOutputWriter, model)
# TODO allow user to split by number of snapshots, rathern than filesize.
# Start a new file if the filesize exceeds max_filesize
filesize(ow.filepath) ≥ ow.max_filesize && start_next_file(model, ow)
# Start a new file if the file_splitting(model) is true
ow.file_splitting(model) && start_next_file(model, ow)
update_file_splitting_schedule!(ow.file_splitting, ow.filepath)

ow.dataset = open(ow)

Expand Down Expand Up @@ -556,7 +562,9 @@ function Base.show(io::IO, ow::NetCDFOutputWriter)
"├── filepath: ", ow.filepath, "\n",
"├── dimensions: $dims", "\n",
"├── $Noutputs outputs: ", prettykeys(ow.outputs), show_averaging_schedule(averaging_schedule), "\n",
"└── array type: ", show_array_type(ow.array_type))
"└── array type: ", show_array_type(ow.array_type),
"├── file_splitting: ", summary(ow.file_splitting), "\n",
"└── file size: ", pretty_filesize(filesize(ow.filepath)))
end

#####
Expand All @@ -577,11 +585,16 @@ dictify(outputs::LagrangianParticles) = Dict("particles" => outputs)
default_dimensions(outputs::Dict{String,<:LagrangianParticles}, grid, indices, with_halos) =
Dict("particle_id" => collect(1:length(outputs["particles"])))

#####
##### File splitting
#####

function start_next_file(model, ow::NetCDFOutputWriter)
verbose = ow.verbose
sz = filesize(ow.filepath)

verbose && @info begin
"Filesize $(pretty_filesize(sz)) has exceeded maximum file size $(pretty_filesize(ow.max_filesize))."
schedule_type = summary(ow.file_splitting)
"Splitting output because $(schedule_type) has been actuated."
navidcy marked this conversation as resolved.
Show resolved Hide resolved
end

if ow.part == 1
Expand Down
41 changes: 41 additions & 0 deletions src/OutputWriters/output_writer_utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,52 @@ using Oceananigans.Fields: AbstractField, indices, boundary_conditions, instanti
using Oceananigans.BoundaryConditions: bc_str, FieldBoundaryConditions, ContinuousBoundaryFunction, DiscreteBoundaryFunction
using Oceananigans.TimeSteppers: QuasiAdamsBashforth2TimeStepper, RungeKutta3TimeStepper
using Oceananigans.Models.LagrangianParticleTracking: LagrangianParticles
using Oceananigans.Utils: AbstractSchedule

#####
##### Output writer utilities
#####

mutable struct FileSizeLimit <: AbstractSchedule
size_limit :: Float64
path :: String
end

"""
FileSizeLimit(size_limit [, path=""])

Return a schedule that actuates when the file at `path` exceeds
the `size_limit`.

The `path` is automatically added and updated when `FileSizeLimit` is
used with an output writer, and should not be provided manually.
"""
FileSizeLimit(size_limit) = FileSizeLimit(size_limit, "")

(fsl::FileSizeLimit)(model) = filesize(fsl.path) >= fsl.size_limit
navidcy marked this conversation as resolved.
Show resolved Hide resolved

function Base.summary(fsl::FileSizeLimit)
current_size_str = pretty_filesize(filesize(fsl.path))
size_limit_str = pretty_filesize(fsl.size_limit)
return string("FileSizeLimit(size_limit=", size_limit_str,
", path=", fsl.path, " (", current_size_str, ")")
end

Base.show(io::IO, fsl::FileSizeLimit) = print(io, summary(fsl))

# Update schedule based on user input
update_file_splitting_schedule!(schedule, filepath) = nothing

function update_file_splitting_schedule!(schedule::FileSizeLimit, filepath)
schedule.path = filepath
return nothing
end

struct NoFileSplitting end
(::NoFileSplitting)(model) = false
Base.summary(::NoFileSplitting) = "NoFileSplitting"
Base.show(io::IO, nfs::NoFileSplitting) = print(io, summary(nfs))

"""
ext(ow)

Expand Down
4 changes: 2 additions & 2 deletions test/test_jld2_output_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ function test_jld2_file_splitting(arch)
function fake_bc_init(file, model)
file["boundary_conditions/fake"] = π
end

navidcy marked this conversation as resolved.
Show resolved Hide resolved
ow = JLD2OutputWriter(model, (; u=model.velocities.u);
dir = ".",
filename = "test.jld2",
Expand All @@ -58,7 +58,7 @@ function test_jld2_file_splitting(arch)
including = [:grid],
array_type = Array{Float64},
with_halos = true,
max_filesize = 200KiB,
file_splitting = FileSizeLimit(200KiB),
overwrite_existing = true)

push!(simulation.output_writers, ow)
Expand Down
Loading