Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File splitting in NetCDFOutputWriter when max_filesize is exceeded #3512

Merged
merged 19 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 162 additions & 61 deletions src/OutputWriters/netcdf_output_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,16 @@ mutable struct NetCDFOutputWriter{D, O, T, A} <: AbstractOutputWriter
dataset :: D
outputs :: O
schedule :: T
overwrite_existing :: Bool
array_type :: A
previous :: Float64
indices :: Tuple
with_halos :: Bool
global_attributes :: Dict
output_attributes :: Dict
dimensions :: Dict
overwrite_existing :: Bool
deflatelevel :: Int
part :: Int
max_filesize :: Float64
verbose :: Bool
end

Expand Down Expand Up @@ -165,6 +172,8 @@ end
dimensions = Dict(),
overwrite_existing = false,
deflatelevel = 0,
part = 1,
max_filesize = Inf,
verbose = false)

Construct a `NetCDFOutputWriter` that writes `(label, output)` pairs in `outputs` (which should
Expand Down Expand Up @@ -213,6 +222,13 @@ Keyword arguments
and 9 means maximum compression). See [NCDatasets.jl documentation](https://alexander-barth.github.io/NCDatasets.jl/stable/variables/#Creating-a-variable)
for more information.

- `part`: The starting part number used if `max_filesize` is finite.
Default: 1.

- `max_filesize`: The writer will stop writing to the output file once the file size exceeds `max_filesize`,
and write to a new one with a consistent naming scheme ending in `part1`, `part2`, etc.
Defaults to `Inf`.

## Miscellaneous keywords

- `global_attributes`: Dict of model properties to save with every file. Default: `Dict()`.
Expand Down Expand Up @@ -337,8 +353,9 @@ function NetCDFOutputWriter(model, outputs; filename, schedule,
dimensions = Dict(),
overwrite_existing = nothing,
deflatelevel = 0,
part = 1,
max_filesize = Inf,
verbose = false)

mkpath(dir)
filename = auto_extension(filename, ".nc")
filepath = joinpath(dir, filename)
Expand All @@ -357,77 +374,49 @@ function NetCDFOutputWriter(model, outputs; filename, schedule,

elseif isfile(filepath) && overwrite_existing
@warn "Overwriting existing $filepath."

end
end

mode = overwrite_existing ? "c" : "a"

# TODO: This call to dictify is only necessary because "dictify" is hacked to help
# with LagrangianParticles output (see the end of the file).
# We shouldn't support this in the future; we should require users to 'name' LagrangianParticles output.
outputs = dictify(outputs)
outputs = Dict(string(name) => construct_output(outputs[name], model.grid, indices, with_halos) for name in keys(outputs))

output_attributes = dictify(output_attributes)
global_attributes = dictify(global_attributes)
dimensions = dictify(dimensions)

# Ensure we can add any kind of metadata to the global attributes later by converting to Dict{Any, Any}.
global_attributes = Dict{Any, Any}(global_attributes)

# Add useful metadata
global_attributes["date"] = "This file was generated on $(now())."
global_attributes["Julia"] = "This file was generated using " * versioninfo_with_gpu()
global_attributes["Oceananigans"] = "This file was generated using " * oceananigans_versioninfo()

add_schedule_metadata!(global_attributes, schedule)

# Convert schedule to TimeInterval and each output to WindowedTimeAverage if
# schedule::AveragedTimeInterval
schedule, outputs = time_average_outputs(schedule, outputs, model)

dims = default_dimensions(outputs, model.grid, indices, with_halos)

# Open the NetCDF dataset file
dataset = NCDataset(filepath, mode, attrib=global_attributes)

default_dimension_attributes = get_default_dimension_attributes(model.grid)

# Define variables for each dimension and attributes if this is a new file.
if mode == "c"
for (dim_name, dim_array) in dims
defVar(dataset, dim_name, array_type(dim_array), (dim_name,),
deflatelevel=deflatelevel, attrib=default_dimension_attributes[dim_name])
end

# DateTime and TimeDate are both <: AbstractTime
time_attrib = model.clock.time isa AbstractTime ?
Dict("long_name" => "Time", "units" => "seconds since 2000-01-01 00:00:00") :
Dict("long_name" => "Time", "units" => "seconds")

# Creates an unlimited dimension "time"
defDim(dataset, "time", Inf)
defVar(dataset, "time", eltype(model.grid), ("time",), attrib=time_attrib)

# Use default output attributes for known outputs if the user has not specified any.
# Unknown outputs get an empty tuple (no output attributes).
for c in keys(outputs)
if !haskey(output_attributes, c)
output_attributes[c] = c in keys(default_output_attributes) ? default_output_attributes[c] : ()
end
end

for (name, output) in outputs
attributes = try output_attributes[name]; catch; Dict(); end
define_output_variable!(dataset, output, name, array_type, deflatelevel, attributes, dimensions)
end

sync(dataset)
end

close(dataset)

return NetCDFOutputWriter(filepath, dataset, outputs, schedule, overwrite_existing, array_type, 0.0, verbose)
dataset, outputs, schedule = initialize_nc_file!(filepath,
outputs,
schedule,
array_type,
indices,
with_halos,
global_attributes,
output_attributes,
dimensions,
overwrite_existing,
deflatelevel,
model)

return NetCDFOutputWriter(filepath,
dataset,
outputs,
schedule,
array_type,
indices,
with_halos,
global_attributes,
output_attributes,
dimensions,
overwrite_existing,
deflatelevel,
part,
max_filesize,
verbose)
end

get_default_dimension_attributes(::RectilinearGrid) =
Expand Down Expand Up @@ -492,10 +481,14 @@ end
"""
write_output!(output_writer, model)

Writes output to netcdf file `output_writer.filepath` at specified intervals. Increments the `time` dimension
Write output to netcdf file `output_writer.filepath` at specified intervals. Increments the `time` dimension
every time an output is written to the file.
"""
function write_output!(ow::NetCDFOutputWriter, model)
# TODO allow user to split by number of snapshots, rathern than filesize.
# Start a new file if the filesize exceeds max_filesize
filesize(ow.filepath) ≥ ow.max_filesize && start_next_file(model, ow)

ow.dataset = open(ow)

ds, verbose, filepath = ow.dataset, ow.verbose, ow.filepath
Expand Down Expand Up @@ -583,3 +576,111 @@ dictify(outputs::LagrangianParticles) = Dict("particles" => outputs)

default_dimensions(outputs::Dict{String,<:LagrangianParticles}, grid, indices, with_halos) =
Dict("particle_id" => collect(1:length(outputs["particles"])))

function start_next_file(model, ow::NetCDFOutputWriter)
verbose = ow.verbose
sz = filesize(ow.filepath)
verbose && @info begin
"Filesize $(pretty_filesize(sz)) has exceeded maximum file size $(pretty_filesize(ow.max_filesize))."
end

if ow.part == 1
part1_path = replace(ow.filepath, r".nc$" => "_part1.nc")
verbose && @info "Renaming first part: $(ow.filepath) -> $part1_path"
mv(ow.filepath, part1_path, force=ow.overwrite_existing)
ow.filepath = part1_path
end

ow.part += 1
ow.filepath = replace(ow.filepath, r"part\d+.nc$" => "part" * string(ow.part) * ".nc")
ow.overwrite_existing && isfile(ow.filepath) && rm(ow.filepath, force=true)
verbose && @info "Now writing to: $(ow.filepath)"

initialize_nc_file!(ow, model)

return nothing
end

function initialize_nc_file!(filepath,
outputs,
schedule,
array_type,
indices,
with_halos,
global_attributes,
output_attributes,
dimensions,
overwrite_existing,
deflatelevel,
model)

mode = overwrite_existing ? "c" : "a"

# Add useful metadata
global_attributes["date"] = "This file was generated on $(now())."
global_attributes["Julia"] = "This file was generated using " * versioninfo_with_gpu()
global_attributes["Oceananigans"] = "This file was generated using " * oceananigans_versioninfo()

add_schedule_metadata!(global_attributes, schedule)

# Convert schedule to TimeInterval and each output to WindowedTimeAverage if
# schedule::AveragedTimeInterval
schedule, outputs = time_average_outputs(schedule, outputs, model)

dims = default_dimensions(outputs, model.grid, indices, with_halos)

# Open the NetCDF dataset file
dataset = NCDataset(filepath, mode, attrib=global_attributes)

default_dimension_attributes = get_default_dimension_attributes(model.grid)

# Define variables for each dimension and attributes if this is a new file.
if mode == "c"
for (dim_name, dim_array) in dims
defVar(dataset, dim_name, array_type(dim_array), (dim_name,),
deflatelevel=deflatelevel, attrib=default_dimension_attributes[dim_name])
end

# DateTime and TimeDate are both <: AbstractTime
time_attrib = model.clock.time isa AbstractTime ?
Dict("long_name" => "Time", "units" => "seconds since 2000-01-01 00:00:00") :
Dict("long_name" => "Time", "units" => "seconds")

# Creates an unlimited dimension "time"
defDim(dataset, "time", Inf)
defVar(dataset, "time", eltype(model.grid), ("time",), attrib=time_attrib)

# Use default output attributes for known outputs if the user has not specified any.
# Unknown outputs get an empty tuple (no output attributes).
for c in keys(outputs)
if !haskey(output_attributes, c)
output_attributes[c] = c in keys(default_output_attributes) ? default_output_attributes[c] : ()
end
end

for (name, output) in outputs
attributes = try output_attributes[name]; catch; Dict(); end
define_output_variable!(dataset, output, name, array_type, deflatelevel, attributes, dimensions)
end

sync(dataset)
end

close(dataset)

return dataset, outputs, schedule
end

initialize_nc_file!(ow::NetCDFOutputWriter, model) =
initialize_nc_file!(ow.filepath,
ow.outputs,
ow.schedule,
ow.array_type,
ow.indices,
ow.with_halos,
ow.global_attributes,
ow.output_attributes,
ow.dimensions,
ow.overwrite_existing,
ow.deflatelevel,
model)
48 changes: 48 additions & 0 deletions test/test_netcdf_output_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,53 @@ function test_DateTime_netcdf_output(arch)
return nothing
end

function test_netcdf_file_splitting(arch)
grid = RectilinearGrid(arch, size=(16, 16, 16), extent=(1, 1, 1), halo=(1, 1, 1))
model = NonhydrostaticModel(; grid, buoyancy=SeawaterBuoyancy(), tracers=(:T, :S))
simulation = Simulation(model, Δt=1, stop_iteration=10)

fake_attributes = Dict("fake_attribute"=>"fake_attribute")

max_filesize = 200KiB

ow = NetCDFOutputWriter(model, (; u=model.velocities.u);
dir = ".",
filename = "test.nc",
schedule = IterationInterval(1),
array_type = Array{Float64},
with_halos = true,
global_attributes = fake_attributes,
max_filesize,
overwrite_existing = true)

push!(simulation.output_writers, ow)

# 531 KiB of output will be written which should get split into 3 files.
run!(simulation)

# Test that files has been split according to size as expected.
@test filesize("test_part1.nc") > max_filesize
@test filesize("test_part2.nc") > max_filesize
@test filesize("test_part3.nc") < max_filesize
@test !isfile("test_part4.nc")

for n in string.(1:3)
filename = "test_part$n.nc"
ds = NCDataset(filename,"r")
dimlength = length(keys(ds.dim))
# Test that all files contain the same dimensions.
@test dimlength == 7
# Test that all files contain the user defined attributes.
@test ds.attrib["fake_attribute"] == "fake_attribute"

# Leave test directory clean.
close(ds)
rm(filename)
end

return nothing
end

function test_TimeDate_netcdf_output(arch)
grid = RectilinearGrid(arch, size=(1, 1, 1), extent=(1, 1, 1))
clock = Clock(time=TimeDate(2021, 1, 1))
Expand Down Expand Up @@ -835,6 +882,7 @@ for arch in archs
@testset "NetCDF output writer [$(typeof(arch))]" begin
@info " Testing NetCDF output writer [$(typeof(arch))]..."
test_DateTime_netcdf_output(arch)
test_netcdf_file_splitting(arch)
test_TimeDate_netcdf_output(arch)
test_thermal_bubble_netcdf_output(arch)
test_thermal_bubble_netcdf_output_with_halos(arch)
Expand Down