Module ocean_science_utilities.wavephysics.balance.wind_inversion
Expand source code
import numba # type: ignore
import numpy as np
import xarray
from numba_progress import ProgressBar # type: ignore
from typing import Optional, List, Tuple
from ocean_science_utilities.wavephysics.balance._numba_settings import (
numba_nocache_parallel,
numba_nocache,
numba_default,
)
from ocean_science_utilities.wavephysics.balance.balance import SourceTermBalance
from ocean_science_utilities.wavephysics.balance.dissipation import (
_bulk_dissipation_direction_point,
)
from ocean_science_utilities.wavephysics.balance.solvers import numba_newton_raphson
from ocean_science_utilities.wavephysics.balance.stress import (
_roughness_estimate_point,
_total_stress_point,
)
from ocean_science_utilities.wavespectra.operations import numba_integrate_spectral_data
from ocean_science_utilities.wavespectra.spectrum import FrequencyDirectionSpectrum
def windspeed_and_direction_from_spectra(
balance: SourceTermBalance,
guess_u10: xarray.DataArray,
spectrum: FrequencyDirectionSpectrum,
jacobian: bool = False,
jacobian_parameters: Optional[List[str]] = None,
time_derivative_spectrum: Optional[FrequencyDirectionSpectrum] = None,
direction_iteration: bool = False,
) -> xarray.Dataset:
"""
:param bulk_rate:
:param guess_u10:
:param guess_direction:
:param spectrum:
:return:
"""
disable = spectrum.number_of_spectra < 100
if time_derivative_spectrum is None:
time_derivative_spectrum_values = np.zeros(spectrum.shape())
else:
time_derivative_spectrum_values = (
time_derivative_spectrum.variance_density.values
)
with ProgressBar(
total=spectrum.number_of_spectra,
disable=disable,
desc=f"Estimating U10 from {balance.generation.name} "
f"and {balance.dissipation.name} wind and dissipation source terms",
) as progress_bar:
if not jacobian:
speed, direction = _u10_from_spectra(
variance_density=spectrum.variance_density.values,
guess_u10=guess_u10.values,
depth=spectrum.depth.values,
wind_source_term_function=balance.generation._wind_source_term_function,
tail_stress_parametrization_function=(
balance.generation._tail_stress_parametrization_function
),
dissipation_source_term_function=(
balance.dissipation._dissipation_function
),
parameters_generation=balance.generation.parameters,
parameters_dissipation=balance.dissipation.parameters,
spectral_grid=balance.generation.spectral_grid(spectrum),
progress_bar=progress_bar,
time_derivative_spectrum=time_derivative_spectrum_values,
direction_iteration=direction_iteration,
)
else:
if jacobian_parameters is None:
raise ValueError(
"If gradients are requested a parameter list is required"
)
speed, direction, grad = _u10_from_spectra_gradient(
variance_density=spectrum.variance_density.values,
guess_u10=guess_u10.values,
depth=spectrum.depth.values,
wind_source_term_function=balance.generation._wind_source_term_function,
tail_stress_parametrization_function=(
balance.generation._tail_stress_parametrization_function
),
dissipation_source_term_function=(
balance.dissipation._dissipation_function
),
grad_parameters=numba.typed.List(jacobian_parameters),
parameters_generation=balance.generation.parameters,
parameters_dissipation=balance.dissipation.parameters,
spectral_grid=balance.generation.spectral_grid(spectrum),
time_derivative_spectrum=time_derivative_spectrum,
progress_bar=progress_bar,
direction_iteration=direction_iteration,
)
grad = xarray.DataArray(data=grad)
u10 = xarray.DataArray(
data=speed,
dims=spectrum.dims_space_time,
coords=spectrum.coords_space_time,
)
direction = xarray.DataArray(
data=direction,
dims=spectrum.dims_space_time,
coords=spectrum.coords_space_time,
)
if jacobian:
return xarray.Dataset(
data_vars={"u10": u10, "direction": direction, "jacobian": grad}
)
else:
return xarray.Dataset(data_vars={"u10": u10, "direction": direction})
@numba.jit(**numba_nocache)
def _u10_from_bulk_rate_point(
bulk_rate,
variance_density,
guess_u10,
guess_direction,
depth,
spectral_grid,
parameters,
wind_source_term_function,
tail_stress_parametrization_function,
time_derivative_spectrum,
direction_iteration,
):
"""
:param bulk_rate:
:param variance_density: The wave spectrum in m**2/Hz/deg
:param guess_u10:
:param guess_direction:
:param depth:
:param spectral_grid:
:param parameters:
:return:
"""
direction = guess_direction
u10 = guess_u10
if bulk_rate == 0.0:
return 0.0, guess_direction
# We are tryomg to solve for U10; ~ accuracy of 0.01m/s is sufficient.
# We do not really care about relative accuracy - for low winds (<1m/s)
# answers are really inaccurate anyway
atol = 1.0e-2
rtol = 1.0
numerical_stepsize = 1e-3
if direction_iteration:
niter = 20
else:
niter = 1
for ii in range(0, niter):
wind_guess = (u10, direction, "u10")
roughness_memory = [-1.0]
args = (
roughness_memory,
variance_density,
wind_guess,
depth,
wind_source_term_function,
tail_stress_parametrization_function,
spectral_grid,
parameters,
bulk_rate,
time_derivative_spectrum,
)
try:
u10 = numba_newton_raphson(
_u10_iteration_function,
u10,
args,
(0, np.inf),
atol=atol,
rtol=rtol,
numerical_stepsize=numerical_stepsize,
)
except:
u10 = np.nan
break
if direction_iteration:
wind_guess = (u10, direction, "u10")
_, new_direction = _total_stress_point(
roughness_memory[0],
variance_density,
wind_guess,
depth,
wind_source_term_function,
tail_stress_parametrization_function,
spectral_grid,
parameters,
)
direction_delta = (new_direction - direction + 180) % 360 - 180
if abs(direction_delta) < 1.0:
direction = new_direction
break
elif abs(direction_delta) < 10.0:
direction = new_direction
else:
# Some under-relaxation
direction = (direction + 0.5 * direction_delta) % 360
return u10, direction
@numba.jit(**numba_nocache)
def _u10_iteration_function(
u10,
memory_list,
variance_density,
wind_guess,
depth,
wind_source_term_function,
tail_stress_parametrization_function,
spectral_grid,
parameters,
bulk_rate,
time_derivative_spectrum,
):
"""
To find the 10 meter wind that generates a certain bulk input we need to solve the
inverse function for the wind input term. We define a function
F(U10) = wind_bulk_rate(u10) - specified_bulk_rate
and use a zero finder to solve for
F(U10) = 0.
This function defines the function F. The zero finder is responsible for calling it
with the correct trailing arguments.
:param wind_forcing: Wind input U10 we want to solve for.
:param variance_density: The wave spectrum in m**2/Hz/deg
:param wind_guess:
:param depth:
:param spectral_grid:
:param parameters:
:param bulk_rate:
:return:
"""
if u10 == 0.0:
return -bulk_rate
wind = (u10, wind_guess[1], wind_guess[2])
# Estimate the rougness length
roughness_length = _roughness_estimate_point(
memory_list[0],
variance_density,
wind,
depth,
wind_source_term_function,
tail_stress_parametrization_function,
spectral_grid,
parameters,
)
memory_list[0] = roughness_length
# Calculate the wind input source term values
generation = wind_source_term_function(
variance_density,
wind,
depth,
roughness_length,
spectral_grid,
parameters,
)
# Calculate the contribution of dEdt integrated over the spectrum. We only include
# spectral values in region where there is a positive wind input
bulk_time_derivative_spectrum = spectral_time_derivative_in_active_region(
time_derivative_spectrum,
generation,
spectral_grid,
)
# Integrate the input and return the difference of the current guess
# with the desired bulk rate
return (
numba_integrate_spectral_data(generation, spectral_grid)
- bulk_rate
- bulk_time_derivative_spectrum
)
@numba.jit(**numba_default)
def spectral_time_derivative_in_active_region(
time_derivative_spectrum: np.typing.NDArray,
generation: np.typing.NDArray,
spectral_grid,
):
frequency_step = spectral_grid["frequency_step"]
direction_step = spectral_grid["direction_step"]
number_of_directions = time_derivative_spectrum.shape[1]
number_of_frequencies = time_derivative_spectrum.shape[0]
bulk_time_derivative_spectrum = 0.0
for frequency_index in range(number_of_frequencies):
for direction_index in range(number_of_directions):
if generation[frequency_index, direction_index] > 0.0:
bulk_time_derivative_spectrum += (
time_derivative_spectrum[frequency_index, direction_index]
* direction_step[direction_index]
* frequency_step[frequency_index]
)
return bulk_time_derivative_spectrum
@numba.jit(**numba_nocache)
def _u10_from_spectra_point(
variance_density,
guess_u10,
depth,
wind_source_term_function,
tail_stress_parametrization_function,
dissipation_source_term_function,
parameters_generation,
parameters_dissipation,
spectral_grid,
time_derivative_spectrum,
direction_iteration,
) -> Tuple[float, float]:
"""
:param bulk_rate:
:param variance_density: The wave spectrum in m**2/Hz/deg
:param guess_u10:
:param depth:
:param parameters:
:param spectral_grid:
:param progress_bar:
:return:
"""
direction, bulk_rate = _bulk_dissipation_direction_point(
variance_density,
depth,
dissipation_source_term_function,
spectral_grid,
parameters_dissipation,
)
# Note dissipation is negatve- but our target bulk wind generation is positive
u10, direction = _u10_from_bulk_rate_point(
-bulk_rate,
variance_density,
guess_u10,
direction,
depth,
spectral_grid,
parameters_generation,
wind_source_term_function,
tail_stress_parametrization_function,
time_derivative_spectrum,
direction_iteration,
)
return u10, direction
# ----------------------------------------------------------------------------------------------------------------------
# Functions for training (gradients with regard to parameter)
# ----------------------------------------------------------------------------------------------------------------------
@numba.jit(**numba_nocache)
def _u10_parameter_gradient(
variance_density,
guess_u10,
depth,
wind_source_term_function,
tail_stress_parametrization_function,
dissipation_source_term_function,
grad_parameters,
parameters_generation,
parameters_dissipation,
spectral_grid,
time_derivative_spectrum,
direction_iteration,
) -> Tuple[float, float, np.typing.NDArray]:
"""
Function to numerically calculate gradients for the requested coeficients
:param bulk_rate:
:param variance_density: The wave spectrum in m**2/Hz/deg
:param guess_u10:
:param depth:
:param parameters:
:param spectral_grid:
:param progress_bar:
:return:
"""
# Calculate the zero point
direction, bulk_rate = _bulk_dissipation_direction_point(
variance_density,
depth,
dissipation_source_term_function,
spectral_grid,
parameters_dissipation,
)
# Note dissipation is negatve- but our target bulk wind generation is positive
u10, direction = _u10_from_bulk_rate_point(
-bulk_rate,
variance_density,
guess_u10,
direction,
depth,
spectral_grid,
parameters_generation,
wind_source_term_function,
tail_stress_parametrization_function,
time_derivative_spectrum,
direction_iteration,
)
grad = np.empty(len(grad_parameters))
if np.isnan(u10):
grad[:] = 0
return u10, direction, grad
for index, param in enumerate(grad_parameters):
perturbed_parameters_dissipation = parameters_dissipation.copy()
perturbed_parameters_generation = parameters_generation.copy()
if param in parameters_dissipation:
step = 0.05 * abs(perturbed_parameters_dissipation[param])
perturbed_parameters_dissipation[param] += step
dissipation = dissipation_source_term_function(
variance_density=variance_density,
depth=depth,
spectral_grid=spectral_grid,
parameters=perturbed_parameters_dissipation,
)
new_bulk_rate = numba_integrate_spectral_data(dissipation, spectral_grid)
else:
step = 0.05 * abs(perturbed_parameters_generation[param])
perturbed_parameters_generation[param] += step
new_bulk_rate = bulk_rate
new_u10, direction = _u10_from_bulk_rate_point(
-new_bulk_rate,
variance_density,
u10,
direction,
depth,
spectral_grid,
perturbed_parameters_generation,
wind_source_term_function,
tail_stress_parametrization_function,
time_derivative_spectrum,
direction_iteration,
)
if np.isnan(new_u10):
grad[index] = 0
else:
grad[index] = (new_u10 - u10) / step
return u10, direction, grad
# ----------------------------------------------------------------------------------------------------------------------
# Apply to all spatial points
# ----------------------------------------------------------------------------------------------------------------------
@numba.jit(**numba_nocache_parallel)
def _u10_from_spectra_gradient(
variance_density,
guess_u10,
depth,
wind_source_term_function,
tail_stress_parametrization_function,
dissipation_source_term_function,
grad_parameters,
parameters_generation,
parameters_dissipation,
spectral_grid,
time_derivative_spectrum,
progress_bar: ProgressBar = None,
direction_iteration=False,
) -> Tuple[np.typing.NDArray, np.typing.NDArray, np.typing.NDArray]:
"""
:param variance_density:
:param guess_u10:
:param depth:
:param wind_source_term_function:
:param dissipation_source_term_function:
:param parameters_generation:
:param parameters_dissipation:
:param spectral_grid:
:param progress_bar:
:return:
"""
number_of_points = variance_density.shape[0]
u10 = np.empty((number_of_points))
direction = np.empty((number_of_points))
grad = np.empty((number_of_points, len(grad_parameters)))
for point_index in numba.prange(number_of_points):
if progress_bar is not None:
progress_bar.update(1)
(
u10[point_index],
direction[point_index],
grad[point_index, :],
) = _u10_parameter_gradient(
variance_density[point_index, :, :],
guess_u10[point_index],
depth[point_index],
wind_source_term_function,
tail_stress_parametrization_function,
dissipation_source_term_function,
grad_parameters,
parameters_generation,
parameters_dissipation,
spectral_grid,
time_derivative_spectrum[point_index, :, :],
direction_iteration,
)
return u10, direction, grad
@numba.jit(**numba_nocache_parallel)
def _u10_from_spectra(
variance_density: np.typing.NDArray,
guess_u10: np.typing.NDArray,
depth: np.typing.NDArray,
wind_source_term_function,
tail_stress_parametrization_function,
dissipation_source_term_function,
parameters_generation,
parameters_dissipation,
spectral_grid,
progress_bar: ProgressBar = None,
time_derivative_spectrum=None,
direction_iteration=False,
) -> Tuple[np.typing.NDArray, np.typing.NDArray]:
"""
:param variance_density:
:param guess_u10:
:param depth:
:param wind_source_term_function:
:param dissipation_source_term_function:
:param parameters_generation:
:param parameters_dissipation:
:param spectral_grid:
:param progress_bar:
:return:
"""
number_of_points = variance_density.shape[0]
u10 = np.empty((number_of_points))
direction = np.empty((number_of_points))
for point_index in numba.prange(number_of_points):
if progress_bar is not None:
progress_bar.update(1)
u10[point_index], direction[point_index] = _u10_from_spectra_point(
variance_density[point_index, :, :],
guess_u10[point_index],
depth[point_index],
wind_source_term_function,
tail_stress_parametrization_function,
dissipation_source_term_function,
parameters_generation,
parameters_dissipation,
spectral_grid,
time_derivative_spectrum[point_index, :, :],
direction_iteration,
)
return u10, direction
Functions
def spectral_time_derivative_in_active_region(time_derivative_spectrum: numpy.ndarray[typing.Any, numpy.dtype[+ScalarType]], generation: numpy.ndarray[typing.Any, numpy.dtype[+ScalarType]], spectral_grid)
-
Expand source code
@numba.jit(**numba_default) def spectral_time_derivative_in_active_region( time_derivative_spectrum: np.typing.NDArray, generation: np.typing.NDArray, spectral_grid, ): frequency_step = spectral_grid["frequency_step"] direction_step = spectral_grid["direction_step"] number_of_directions = time_derivative_spectrum.shape[1] number_of_frequencies = time_derivative_spectrum.shape[0] bulk_time_derivative_spectrum = 0.0 for frequency_index in range(number_of_frequencies): for direction_index in range(number_of_directions): if generation[frequency_index, direction_index] > 0.0: bulk_time_derivative_spectrum += ( time_derivative_spectrum[frequency_index, direction_index] * direction_step[direction_index] * frequency_step[frequency_index] ) return bulk_time_derivative_spectrum
def windspeed_and_direction_from_spectra(balance: SourceTermBalance, guess_u10: xarray.core.dataarray.DataArray, spectrum: FrequencyDirectionSpectrum, jacobian: bool = False, jacobian_parameters: Optional[List[str]] = None, time_derivative_spectrum: Optional[FrequencyDirectionSpectrum] = None, direction_iteration: bool = False) ‑> xarray.core.dataset.Dataset
-
:param bulk_rate: :param guess_u10: :param guess_direction: :param spectrum: :return:
Expand source code
def windspeed_and_direction_from_spectra( balance: SourceTermBalance, guess_u10: xarray.DataArray, spectrum: FrequencyDirectionSpectrum, jacobian: bool = False, jacobian_parameters: Optional[List[str]] = None, time_derivative_spectrum: Optional[FrequencyDirectionSpectrum] = None, direction_iteration: bool = False, ) -> xarray.Dataset: """ :param bulk_rate: :param guess_u10: :param guess_direction: :param spectrum: :return: """ disable = spectrum.number_of_spectra < 100 if time_derivative_spectrum is None: time_derivative_spectrum_values = np.zeros(spectrum.shape()) else: time_derivative_spectrum_values = ( time_derivative_spectrum.variance_density.values ) with ProgressBar( total=spectrum.number_of_spectra, disable=disable, desc=f"Estimating U10 from {balance.generation.name} " f"and {balance.dissipation.name} wind and dissipation source terms", ) as progress_bar: if not jacobian: speed, direction = _u10_from_spectra( variance_density=spectrum.variance_density.values, guess_u10=guess_u10.values, depth=spectrum.depth.values, wind_source_term_function=balance.generation._wind_source_term_function, tail_stress_parametrization_function=( balance.generation._tail_stress_parametrization_function ), dissipation_source_term_function=( balance.dissipation._dissipation_function ), parameters_generation=balance.generation.parameters, parameters_dissipation=balance.dissipation.parameters, spectral_grid=balance.generation.spectral_grid(spectrum), progress_bar=progress_bar, time_derivative_spectrum=time_derivative_spectrum_values, direction_iteration=direction_iteration, ) else: if jacobian_parameters is None: raise ValueError( "If gradients are requested a parameter list is required" ) speed, direction, grad = _u10_from_spectra_gradient( variance_density=spectrum.variance_density.values, guess_u10=guess_u10.values, depth=spectrum.depth.values, wind_source_term_function=balance.generation._wind_source_term_function, tail_stress_parametrization_function=( balance.generation._tail_stress_parametrization_function ), dissipation_source_term_function=( balance.dissipation._dissipation_function ), grad_parameters=numba.typed.List(jacobian_parameters), parameters_generation=balance.generation.parameters, parameters_dissipation=balance.dissipation.parameters, spectral_grid=balance.generation.spectral_grid(spectrum), time_derivative_spectrum=time_derivative_spectrum, progress_bar=progress_bar, direction_iteration=direction_iteration, ) grad = xarray.DataArray(data=grad) u10 = xarray.DataArray( data=speed, dims=spectrum.dims_space_time, coords=spectrum.coords_space_time, ) direction = xarray.DataArray( data=direction, dims=spectrum.dims_space_time, coords=spectrum.coords_space_time, ) if jacobian: return xarray.Dataset( data_vars={"u10": u10, "direction": direction, "jacobian": grad} ) else: return xarray.Dataset(data_vars={"u10": u10, "direction": direction})