Skip to content

Commit

Permalink
File splitting in NetCDFOutputWriter when max_filesize is exceeded (
Browse files Browse the repository at this point in the history
#3512)

* feat:add netcdf file splitting

* feat:add netcdf file splitting test

* fix: solve issue with test

* fix: solve issue with test

* fix: remove test_part files

* Update src/OutputWriters/netcdf_output_writer.jl

Co-authored-by: Navid C. Constantinou <[email protected]>

* Improve cod style to make it easier to read

Co-authored-by: Navid C. Constantinou <[email protected]>

* Improve code style to make it easier to read

Co-authored-by: Navid C. Constantinou <[email protected]>

* Improve code style to make it easier to read

Co-authored-by: Navid C. Constantinou <[email protected]>

* Improve code style to make it easier to read

Co-authored-by: Navid C. Constantinou <[email protected]>

* Update src/OutputWriters/netcdf_output_writer.jl

Co-authored-by: Navid C. Constantinou <[email protected]>

* Update test/test_netcdf_output_writer.jl

Co-authored-by: Navid C. Constantinou <[email protected]>

* Update test/test_netcdf_output_writer.jl

Co-authored-by: Navid C. Constantinou <[email protected]>

* change previous -> part;

* add doc for part kwarg;

* better line breaking

* initialize_nc_file! returns modified schedule

* Add testing to ensure no extra file is created

Co-authored-by: Navid C. Constantinou <[email protected]>

---------

Co-authored-by: Navid C. Constantinou <[email protected]>
  • Loading branch information
josuemtzmo and navidcy committed Mar 15, 2024
1 parent f462e9f commit dccba69
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 61 deletions.
223 changes: 162 additions & 61 deletions src/OutputWriters/netcdf_output_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,16 @@ mutable struct NetCDFOutputWriter{D, O, T, A} <: AbstractOutputWriter
dataset :: D
outputs :: O
schedule :: T
overwrite_existing :: Bool
array_type :: A
previous :: Float64
indices :: Tuple
with_halos :: Bool
global_attributes :: Dict
output_attributes :: Dict
dimensions :: Dict
overwrite_existing :: Bool
deflatelevel :: Int
part :: Int
max_filesize :: Float64
verbose :: Bool
end

Expand Down Expand Up @@ -165,6 +172,8 @@ end
dimensions = Dict(),
overwrite_existing = false,
deflatelevel = 0,
part = 1,
max_filesize = Inf,
verbose = false)
Construct a `NetCDFOutputWriter` that writes `(label, output)` pairs in `outputs` (which should
Expand Down Expand Up @@ -213,6 +222,13 @@ Keyword arguments
and 9 means maximum compression). See [NCDatasets.jl documentation](https://alexander-barth.github.io/NCDatasets.jl/stable/variables/#Creating-a-variable)
for more information.
- `part`: The starting part number used if `max_filesize` is finite.
Default: 1.
- `max_filesize`: The writer will stop writing to the output file once the file size exceeds `max_filesize`,
and write to a new one with a consistent naming scheme ending in `part1`, `part2`, etc.
Defaults to `Inf`.
## Miscellaneous keywords
- `global_attributes`: Dict of model properties to save with every file. Default: `Dict()`.
Expand Down Expand Up @@ -337,8 +353,9 @@ function NetCDFOutputWriter(model, outputs; filename, schedule,
dimensions = Dict(),
overwrite_existing = nothing,
deflatelevel = 0,
part = 1,
max_filesize = Inf,
verbose = false)

mkpath(dir)
filename = auto_extension(filename, ".nc")
filepath = joinpath(dir, filename)
Expand All @@ -357,77 +374,49 @@ function NetCDFOutputWriter(model, outputs; filename, schedule,

elseif isfile(filepath) && overwrite_existing
@warn "Overwriting existing $filepath."

end
end

mode = overwrite_existing ? "c" : "a"

# TODO: This call to dictify is only necessary because "dictify" is hacked to help
# with LagrangianParticles output (see the end of the file).
# We shouldn't support this in the future; we should require users to 'name' LagrangianParticles output.
outputs = dictify(outputs)
outputs = Dict(string(name) => construct_output(outputs[name], model.grid, indices, with_halos) for name in keys(outputs))

output_attributes = dictify(output_attributes)
global_attributes = dictify(global_attributes)
dimensions = dictify(dimensions)

# Ensure we can add any kind of metadata to the global attributes later by converting to Dict{Any, Any}.
global_attributes = Dict{Any, Any}(global_attributes)

# Add useful metadata
global_attributes["date"] = "This file was generated on $(now())."
global_attributes["Julia"] = "This file was generated using " * versioninfo_with_gpu()
global_attributes["Oceananigans"] = "This file was generated using " * oceananigans_versioninfo()

add_schedule_metadata!(global_attributes, schedule)

# Convert schedule to TimeInterval and each output to WindowedTimeAverage if
# schedule::AveragedTimeInterval
schedule, outputs = time_average_outputs(schedule, outputs, model)

dims = default_dimensions(outputs, model.grid, indices, with_halos)

# Open the NetCDF dataset file
dataset = NCDataset(filepath, mode, attrib=global_attributes)

default_dimension_attributes = get_default_dimension_attributes(model.grid)

# Define variables for each dimension and attributes if this is a new file.
if mode == "c"
for (dim_name, dim_array) in dims
defVar(dataset, dim_name, array_type(dim_array), (dim_name,),
deflatelevel=deflatelevel, attrib=default_dimension_attributes[dim_name])
end

# DateTime and TimeDate are both <: AbstractTime
time_attrib = model.clock.time isa AbstractTime ?
Dict("long_name" => "Time", "units" => "seconds since 2000-01-01 00:00:00") :
Dict("long_name" => "Time", "units" => "seconds")

# Creates an unlimited dimension "time"
defDim(dataset, "time", Inf)
defVar(dataset, "time", eltype(model.grid), ("time",), attrib=time_attrib)

# Use default output attributes for known outputs if the user has not specified any.
# Unknown outputs get an empty tuple (no output attributes).
for c in keys(outputs)
if !haskey(output_attributes, c)
output_attributes[c] = c in keys(default_output_attributes) ? default_output_attributes[c] : ()
end
end

for (name, output) in outputs
attributes = try output_attributes[name]; catch; Dict(); end
define_output_variable!(dataset, output, name, array_type, deflatelevel, attributes, dimensions)
end

sync(dataset)
end

close(dataset)

return NetCDFOutputWriter(filepath, dataset, outputs, schedule, overwrite_existing, array_type, 0.0, verbose)
dataset, outputs, schedule = initialize_nc_file!(filepath,
outputs,
schedule,
array_type,
indices,
with_halos,
global_attributes,
output_attributes,
dimensions,
overwrite_existing,
deflatelevel,
model)

return NetCDFOutputWriter(filepath,
dataset,
outputs,
schedule,
array_type,
indices,
with_halos,
global_attributes,
output_attributes,
dimensions,
overwrite_existing,
deflatelevel,
part,
max_filesize,
verbose)
end

get_default_dimension_attributes(::RectilinearGrid) =
Expand Down Expand Up @@ -492,10 +481,14 @@ end
"""
write_output!(output_writer, model)
Writes output to netcdf file `output_writer.filepath` at specified intervals. Increments the `time` dimension
Write output to netcdf file `output_writer.filepath` at specified intervals. Increments the `time` dimension
every time an output is written to the file.
"""
function write_output!(ow::NetCDFOutputWriter, model)
# TODO allow user to split by number of snapshots, rathern than filesize.
# Start a new file if the filesize exceeds max_filesize
filesize(ow.filepath) ow.max_filesize && start_next_file(model, ow)

ow.dataset = open(ow)

ds, verbose, filepath = ow.dataset, ow.verbose, ow.filepath
Expand Down Expand Up @@ -583,3 +576,111 @@ dictify(outputs::LagrangianParticles) = Dict("particles" => outputs)

default_dimensions(outputs::Dict{String,<:LagrangianParticles}, grid, indices, with_halos) =
Dict("particle_id" => collect(1:length(outputs["particles"])))

function start_next_file(model, ow::NetCDFOutputWriter)
verbose = ow.verbose
sz = filesize(ow.filepath)
verbose && @info begin
"Filesize $(pretty_filesize(sz)) has exceeded maximum file size $(pretty_filesize(ow.max_filesize))."
end

if ow.part == 1
part1_path = replace(ow.filepath, r".nc$" => "_part1.nc")
verbose && @info "Renaming first part: $(ow.filepath) -> $part1_path"
mv(ow.filepath, part1_path, force=ow.overwrite_existing)
ow.filepath = part1_path
end

ow.part += 1
ow.filepath = replace(ow.filepath, r"part\d+.nc$" => "part" * string(ow.part) * ".nc")
ow.overwrite_existing && isfile(ow.filepath) && rm(ow.filepath, force=true)
verbose && @info "Now writing to: $(ow.filepath)"

initialize_nc_file!(ow, model)

return nothing
end

function initialize_nc_file!(filepath,
outputs,
schedule,
array_type,
indices,
with_halos,
global_attributes,
output_attributes,
dimensions,
overwrite_existing,
deflatelevel,
model)

mode = overwrite_existing ? "c" : "a"

# Add useful metadata
global_attributes["date"] = "This file was generated on $(now())."
global_attributes["Julia"] = "This file was generated using " * versioninfo_with_gpu()
global_attributes["Oceananigans"] = "This file was generated using " * oceananigans_versioninfo()

add_schedule_metadata!(global_attributes, schedule)

# Convert schedule to TimeInterval and each output to WindowedTimeAverage if
# schedule::AveragedTimeInterval
schedule, outputs = time_average_outputs(schedule, outputs, model)

dims = default_dimensions(outputs, model.grid, indices, with_halos)

# Open the NetCDF dataset file
dataset = NCDataset(filepath, mode, attrib=global_attributes)

default_dimension_attributes = get_default_dimension_attributes(model.grid)

# Define variables for each dimension and attributes if this is a new file.
if mode == "c"
for (dim_name, dim_array) in dims
defVar(dataset, dim_name, array_type(dim_array), (dim_name,),
deflatelevel=deflatelevel, attrib=default_dimension_attributes[dim_name])
end

# DateTime and TimeDate are both <: AbstractTime
time_attrib = model.clock.time isa AbstractTime ?
Dict("long_name" => "Time", "units" => "seconds since 2000-01-01 00:00:00") :
Dict("long_name" => "Time", "units" => "seconds")

# Creates an unlimited dimension "time"
defDim(dataset, "time", Inf)
defVar(dataset, "time", eltype(model.grid), ("time",), attrib=time_attrib)

# Use default output attributes for known outputs if the user has not specified any.
# Unknown outputs get an empty tuple (no output attributes).
for c in keys(outputs)
if !haskey(output_attributes, c)
output_attributes[c] = c in keys(default_output_attributes) ? default_output_attributes[c] : ()
end
end

for (name, output) in outputs
attributes = try output_attributes[name]; catch; Dict(); end
define_output_variable!(dataset, output, name, array_type, deflatelevel, attributes, dimensions)
end

sync(dataset)
end

close(dataset)

return dataset, outputs, schedule
end

initialize_nc_file!(ow::NetCDFOutputWriter, model) =
initialize_nc_file!(ow.filepath,
ow.outputs,
ow.schedule,
ow.array_type,
ow.indices,
ow.with_halos,
ow.global_attributes,
ow.output_attributes,
ow.dimensions,
ow.overwrite_existing,
ow.deflatelevel,
model)
48 changes: 48 additions & 0 deletions test/test_netcdf_output_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,53 @@ function test_DateTime_netcdf_output(arch)
return nothing
end

function test_netcdf_file_splitting(arch)
grid = RectilinearGrid(arch, size=(16, 16, 16), extent=(1, 1, 1), halo=(1, 1, 1))
model = NonhydrostaticModel(; grid, buoyancy=SeawaterBuoyancy(), tracers=(:T, :S))
simulation = Simulation(model, Δt=1, stop_iteration=10)

fake_attributes = Dict("fake_attribute"=>"fake_attribute")

max_filesize = 200KiB

ow = NetCDFOutputWriter(model, (; u=model.velocities.u);
dir = ".",
filename = "test.nc",
schedule = IterationInterval(1),
array_type = Array{Float64},
with_halos = true,
global_attributes = fake_attributes,
max_filesize,
overwrite_existing = true)

push!(simulation.output_writers, ow)

# 531 KiB of output will be written which should get split into 3 files.
run!(simulation)

# Test that files has been split according to size as expected.
@test filesize("test_part1.nc") > max_filesize
@test filesize("test_part2.nc") > max_filesize
@test filesize("test_part3.nc") < max_filesize
@test !isfile("test_part4.nc")

for n in string.(1:3)
filename = "test_part$n.nc"
ds = NCDataset(filename,"r")
dimlength = length(keys(ds.dim))
# Test that all files contain the same dimensions.
@test dimlength == 7
# Test that all files contain the user defined attributes.
@test ds.attrib["fake_attribute"] == "fake_attribute"

# Leave test directory clean.
close(ds)
rm(filename)
end

return nothing
end

function test_TimeDate_netcdf_output(arch)
grid = RectilinearGrid(arch, size=(1, 1, 1), extent=(1, 1, 1))
clock = Clock(time=TimeDate(2021, 1, 1))
Expand Down Expand Up @@ -835,6 +882,7 @@ for arch in archs
@testset "NetCDF output writer [$(typeof(arch))]" begin
@info " Testing NetCDF output writer [$(typeof(arch))]..."
test_DateTime_netcdf_output(arch)
test_netcdf_file_splitting(arch)
test_TimeDate_netcdf_output(arch)
test_thermal_bubble_netcdf_output(arch)
test_thermal_bubble_netcdf_output_with_halos(arch)
Expand Down

0 comments on commit dccba69

Please sign in to comment.