diff --git a/src/OutputWriters/netcdf_output_writer.jl b/src/OutputWriters/netcdf_output_writer.jl index f3dc9528d5..0c02e6f627 100644 --- a/src/OutputWriters/netcdf_output_writer.jl +++ b/src/OutputWriters/netcdf_output_writer.jl @@ -15,9 +15,16 @@ mutable struct NetCDFOutputWriter{D, O, T, A} <: AbstractOutputWriter dataset :: D outputs :: O schedule :: T - overwrite_existing :: Bool array_type :: A - previous :: Float64 + indices :: Tuple + with_halos :: Bool + global_attributes :: Dict + output_attributes :: Dict + dimensions :: Dict + overwrite_existing :: Bool + deflatelevel :: Int + part :: Int + max_filesize :: Float64 verbose :: Bool end @@ -165,6 +172,8 @@ end dimensions = Dict(), overwrite_existing = false, deflatelevel = 0, + part = 1, + max_filesize = Inf, verbose = false) Construct a `NetCDFOutputWriter` that writes `(label, output)` pairs in `outputs` (which should @@ -213,6 +222,13 @@ Keyword arguments and 9 means maximum compression). See [NCDatasets.jl documentation](https://alexander-barth.github.io/NCDatasets.jl/stable/variables/#Creating-a-variable) for more information. +- `part`: The starting part number used if `max_filesize` is finite. + Default: 1. + +- `max_filesize`: The writer will stop writing to the output file once the file size exceeds `max_filesize`, + and write to a new one with a consistent naming scheme ending in `part1`, `part2`, etc. + Defaults to `Inf`. + ## Miscellaneous keywords - `global_attributes`: Dict of model properties to save with every file. Default: `Dict()`. @@ -337,8 +353,9 @@ function NetCDFOutputWriter(model, outputs; filename, schedule, dimensions = Dict(), overwrite_existing = nothing, deflatelevel = 0, + part = 1, + max_filesize = Inf, verbose = false) - mkpath(dir) filename = auto_extension(filename, ".nc") filepath = joinpath(dir, filename) @@ -357,17 +374,14 @@ function NetCDFOutputWriter(model, outputs; filename, schedule, elseif isfile(filepath) && overwrite_existing @warn "Overwriting existing $filepath." - end end - - mode = overwrite_existing ? "c" : "a" - # TODO: This call to dictify is only necessary because "dictify" is hacked to help # with LagrangianParticles output (see the end of the file). # We shouldn't support this in the future; we should require users to 'name' LagrangianParticles output. outputs = dictify(outputs) outputs = Dict(string(name) => construct_output(outputs[name], model.grid, indices, with_halos) for name in keys(outputs)) + output_attributes = dictify(output_attributes) global_attributes = dictify(global_attributes) dimensions = dictify(dimensions) @@ -375,59 +389,34 @@ function NetCDFOutputWriter(model, outputs; filename, schedule, # Ensure we can add any kind of metadata to the global attributes later by converting to Dict{Any, Any}. global_attributes = Dict{Any, Any}(global_attributes) - # Add useful metadata - global_attributes["date"] = "This file was generated on $(now())." - global_attributes["Julia"] = "This file was generated using " * versioninfo_with_gpu() - global_attributes["Oceananigans"] = "This file was generated using " * oceananigans_versioninfo() - - add_schedule_metadata!(global_attributes, schedule) - - # Convert schedule to TimeInterval and each output to WindowedTimeAverage if - # schedule::AveragedTimeInterval - schedule, outputs = time_average_outputs(schedule, outputs, model) - - dims = default_dimensions(outputs, model.grid, indices, with_halos) - - # Open the NetCDF dataset file - dataset = NCDataset(filepath, mode, attrib=global_attributes) - - default_dimension_attributes = get_default_dimension_attributes(model.grid) - - # Define variables for each dimension and attributes if this is a new file. - if mode == "c" - for (dim_name, dim_array) in dims - defVar(dataset, dim_name, array_type(dim_array), (dim_name,), - deflatelevel=deflatelevel, attrib=default_dimension_attributes[dim_name]) - end - - # DateTime and TimeDate are both <: AbstractTime - time_attrib = model.clock.time isa AbstractTime ? - Dict("long_name" => "Time", "units" => "seconds since 2000-01-01 00:00:00") : - Dict("long_name" => "Time", "units" => "seconds") - - # Creates an unlimited dimension "time" - defDim(dataset, "time", Inf) - defVar(dataset, "time", eltype(model.grid), ("time",), attrib=time_attrib) - - # Use default output attributes for known outputs if the user has not specified any. - # Unknown outputs get an empty tuple (no output attributes). - for c in keys(outputs) - if !haskey(output_attributes, c) - output_attributes[c] = c in keys(default_output_attributes) ? default_output_attributes[c] : () - end - end - - for (name, output) in outputs - attributes = try output_attributes[name]; catch; Dict(); end - define_output_variable!(dataset, output, name, array_type, deflatelevel, attributes, dimensions) - end - - sync(dataset) - end - - close(dataset) - - return NetCDFOutputWriter(filepath, dataset, outputs, schedule, overwrite_existing, array_type, 0.0, verbose) + dataset, outputs, schedule = initialize_nc_file!(filepath, + outputs, + schedule, + array_type, + indices, + with_halos, + global_attributes, + output_attributes, + dimensions, + overwrite_existing, + deflatelevel, + model) + + return NetCDFOutputWriter(filepath, + dataset, + outputs, + schedule, + array_type, + indices, + with_halos, + global_attributes, + output_attributes, + dimensions, + overwrite_existing, + deflatelevel, + part, + max_filesize, + verbose) end get_default_dimension_attributes(::RectilinearGrid) = @@ -492,10 +481,14 @@ end """ write_output!(output_writer, model) -Writes output to netcdf file `output_writer.filepath` at specified intervals. Increments the `time` dimension +Write output to netcdf file `output_writer.filepath` at specified intervals. Increments the `time` dimension every time an output is written to the file. """ function write_output!(ow::NetCDFOutputWriter, model) + # TODO allow user to split by number of snapshots, rathern than filesize. + # Start a new file if the filesize exceeds max_filesize + filesize(ow.filepath) ≥ ow.max_filesize && start_next_file(model, ow) + ow.dataset = open(ow) ds, verbose, filepath = ow.dataset, ow.verbose, ow.filepath @@ -583,3 +576,111 @@ dictify(outputs::LagrangianParticles) = Dict("particles" => outputs) default_dimensions(outputs::Dict{String,<:LagrangianParticles}, grid, indices, with_halos) = Dict("particle_id" => collect(1:length(outputs["particles"]))) + +function start_next_file(model, ow::NetCDFOutputWriter) + verbose = ow.verbose + sz = filesize(ow.filepath) + verbose && @info begin + "Filesize $(pretty_filesize(sz)) has exceeded maximum file size $(pretty_filesize(ow.max_filesize))." + end + + if ow.part == 1 + part1_path = replace(ow.filepath, r".nc$" => "_part1.nc") + verbose && @info "Renaming first part: $(ow.filepath) -> $part1_path" + mv(ow.filepath, part1_path, force=ow.overwrite_existing) + ow.filepath = part1_path + end + + ow.part += 1 + ow.filepath = replace(ow.filepath, r"part\d+.nc$" => "part" * string(ow.part) * ".nc") + ow.overwrite_existing && isfile(ow.filepath) && rm(ow.filepath, force=true) + verbose && @info "Now writing to: $(ow.filepath)" + + initialize_nc_file!(ow, model) + + return nothing +end + +function initialize_nc_file!(filepath, + outputs, + schedule, + array_type, + indices, + with_halos, + global_attributes, + output_attributes, + dimensions, + overwrite_existing, + deflatelevel, + model) + + mode = overwrite_existing ? "c" : "a" + + # Add useful metadata + global_attributes["date"] = "This file was generated on $(now())." + global_attributes["Julia"] = "This file was generated using " * versioninfo_with_gpu() + global_attributes["Oceananigans"] = "This file was generated using " * oceananigans_versioninfo() + + add_schedule_metadata!(global_attributes, schedule) + + # Convert schedule to TimeInterval and each output to WindowedTimeAverage if + # schedule::AveragedTimeInterval + schedule, outputs = time_average_outputs(schedule, outputs, model) + + dims = default_dimensions(outputs, model.grid, indices, with_halos) + + # Open the NetCDF dataset file + dataset = NCDataset(filepath, mode, attrib=global_attributes) + + default_dimension_attributes = get_default_dimension_attributes(model.grid) + + # Define variables for each dimension and attributes if this is a new file. + if mode == "c" + for (dim_name, dim_array) in dims + defVar(dataset, dim_name, array_type(dim_array), (dim_name,), + deflatelevel=deflatelevel, attrib=default_dimension_attributes[dim_name]) + end + + # DateTime and TimeDate are both <: AbstractTime + time_attrib = model.clock.time isa AbstractTime ? + Dict("long_name" => "Time", "units" => "seconds since 2000-01-01 00:00:00") : + Dict("long_name" => "Time", "units" => "seconds") + + # Creates an unlimited dimension "time" + defDim(dataset, "time", Inf) + defVar(dataset, "time", eltype(model.grid), ("time",), attrib=time_attrib) + + # Use default output attributes for known outputs if the user has not specified any. + # Unknown outputs get an empty tuple (no output attributes). + for c in keys(outputs) + if !haskey(output_attributes, c) + output_attributes[c] = c in keys(default_output_attributes) ? default_output_attributes[c] : () + end + end + + for (name, output) in outputs + attributes = try output_attributes[name]; catch; Dict(); end + define_output_variable!(dataset, output, name, array_type, deflatelevel, attributes, dimensions) + end + + sync(dataset) + end + + close(dataset) + + return dataset, outputs, schedule +end + +initialize_nc_file!(ow::NetCDFOutputWriter, model) = + initialize_nc_file!(ow.filepath, + ow.outputs, + ow.schedule, + ow.array_type, + ow.indices, + ow.with_halos, + ow.global_attributes, + ow.output_attributes, + ow.dimensions, + ow.overwrite_existing, + ow.deflatelevel, + model) diff --git a/test/test_netcdf_output_writer.jl b/test/test_netcdf_output_writer.jl index a276cac9b9..d7a59143ba 100644 --- a/test/test_netcdf_output_writer.jl +++ b/test/test_netcdf_output_writer.jl @@ -45,6 +45,53 @@ function test_DateTime_netcdf_output(arch) return nothing end +function test_netcdf_file_splitting(arch) + grid = RectilinearGrid(arch, size=(16, 16, 16), extent=(1, 1, 1), halo=(1, 1, 1)) + model = NonhydrostaticModel(; grid, buoyancy=SeawaterBuoyancy(), tracers=(:T, :S)) + simulation = Simulation(model, Δt=1, stop_iteration=10) + + fake_attributes = Dict("fake_attribute"=>"fake_attribute") + + max_filesize = 200KiB + + ow = NetCDFOutputWriter(model, (; u=model.velocities.u); + dir = ".", + filename = "test.nc", + schedule = IterationInterval(1), + array_type = Array{Float64}, + with_halos = true, + global_attributes = fake_attributes, + max_filesize, + overwrite_existing = true) + + push!(simulation.output_writers, ow) + + # 531 KiB of output will be written which should get split into 3 files. + run!(simulation) + + # Test that files has been split according to size as expected. + @test filesize("test_part1.nc") > max_filesize + @test filesize("test_part2.nc") > max_filesize + @test filesize("test_part3.nc") < max_filesize + @test !isfile("test_part4.nc") + + for n in string.(1:3) + filename = "test_part$n.nc" + ds = NCDataset(filename,"r") + dimlength = length(keys(ds.dim)) + # Test that all files contain the same dimensions. + @test dimlength == 7 + # Test that all files contain the user defined attributes. + @test ds.attrib["fake_attribute"] == "fake_attribute" + + # Leave test directory clean. + close(ds) + rm(filename) + end + + return nothing +end + function test_TimeDate_netcdf_output(arch) grid = RectilinearGrid(arch, size=(1, 1, 1), extent=(1, 1, 1)) clock = Clock(time=TimeDate(2021, 1, 1)) @@ -835,6 +882,7 @@ for arch in archs @testset "NetCDF output writer [$(typeof(arch))]" begin @info " Testing NetCDF output writer [$(typeof(arch))]..." test_DateTime_netcdf_output(arch) + test_netcdf_file_splitting(arch) test_TimeDate_netcdf_output(arch) test_thermal_bubble_netcdf_output(arch) test_thermal_bubble_netcdf_output_with_halos(arch)