Expand source code
import numpy as np
from numba import jit # type: ignore
from typing import Tuple
from ocean_science_utilities.wavephysics.balance._numba_settings import numba_nocache
from ocean_science_utilities.wavephysics.balance.solvers import numba_newton_raphson
from ocean_science_utilities.wavephysics.balance.wam_tail_stress import (
_charnock_relation_point,
)
from ocean_science_utilities.wavetheory.lineardispersion import (
inverse_intrinsic_dispersion_relation,
)
@jit(**numba_nocache)
def _wave_supported_stress_point(
wind_input: np.typing.NDArray,
depth,
spectral_grid,
variance_density,
wind,
roughness_length,
tail_stress_parametrization_function,
parameters,
) -> Tuple[float, float]:
"""
:param wind_input:
:param depth:
:param spectral_grid:
:param parameters:
:return:
"""
number_of_frequencies, number_of_directions = wind_input.shape
radian_frequency = spectral_grid["radian_frequency"]
radian_direction = spectral_grid["radian_direction"]
frequency_step = spectral_grid["frequency_step"]
direction_step = spectral_grid["direction_step"]
gravitational_acceleration = parameters["gravitational_acceleration"]
if depth == np.inf:
wavenumber = radian_frequency**2 / gravitational_acceleration
else:
wavenumber = inverse_intrinsic_dispersion_relation(radian_frequency, depth)
cosine_step = np.cos(radian_direction) * direction_step
sine_step = np.sin(radian_direction) * direction_step
# Calculate the resolved part of the wavestress over the given input spectrum
stress_east = 0.0
stress_north = 0.0
for frequency_index in range(number_of_frequencies):
inverse_wave_speed = (
wavenumber[frequency_index] / radian_frequency[frequency_index]
) * frequency_step[frequency_index]
for direction_index in range(number_of_directions):
stress_east += (
cosine_step[direction_index]
* wind_input[frequency_index, direction_index]
* inverse_wave_speed
)
stress_north += (
sine_step[direction_index]
* wind_input[frequency_index, direction_index]
* inverse_wave_speed
)
# calculate the magnitude
stress_north *= (
parameters["gravitational_acceleration"] * parameters["water_density"]
)
stress_east *= (
parameters["gravitational_acceleration"] * parameters["water_density"]
)
# Add stress contribution due to unresolved waves in the spectral tail
# (above the last resolved frequency)
(
eastward_tail_wave_stress,
northward_tail_wave_stress,
) = tail_stress_parametrization_function(
variance_density, wind, depth, roughness_length, spectral_grid, parameters
)
stress_east += eastward_tail_wave_stress
stress_north += northward_tail_wave_stress
# Total stress is the sum or resolved and unresolved part.
return stress_east, stress_north # wave_stress_magnitude, wave_stress_direction
@jit(**numba_nocache)
def _roughness_estimate_point(
guess,
variance_density,
wind,
depth,
wind_source_term_function,
tail_stress_parametrization_function,
spectral_grid,
parameters,
):
"""
:param guess:
:param variance_density:
:param wind:
:param depth:
:param spectral_grid:
:param parameters:
:return:
"""
vonkarman_constant = parameters["vonkarman_constant"]
if guess < 0:
if wind[2] == "u10":
drag_coeficient_wu = (0.8 + 0.065 * wind[0]) / 1000
guess = 10 / np.exp(vonkarman_constant / np.sqrt(drag_coeficient_wu))
elif wind[2] in ["ustar", "friction_velocity"]:
guess = _charnock_relation_point(wind[0], parameters)
else:
raise ValueError("unknown wind forcing")
if np.any(np.isnan(variance_density)):
return np.nan
if wind[0] == 0.0:
return np.nan
generation = np.empty((variance_density.shape))
function_arguments = (
variance_density,
wind,
depth,
wind_source_term_function,
tail_stress_parametrization_function,
spectral_grid,
parameters,
generation,
)
log_root = numba_newton_raphson(
_stress_iteration_function,
np.log(guess),
function_arguments,
hard_bounds=(-20, 0),
relative_stepsize=False,
atol=1e-6,
rtol=1e-6,
error_on_max_iter=True,
max_iterations=100,
verbose=False,
name="stress iteration",
aitken_acceleration=False,
)
return np.exp(log_root)
@jit(**numba_nocache)
def _roughness_estimate(
guess,
variance_density,
wind,
depth,
wind_source_term_function,
tail_stress_parametrization_function,
spectral_grid,
parameters,
) -> np.typing.NDArray:
"""
:param guess:
:param variance_density: The wave spectrum in m**2/Hz/deg
:param wind: wind input tuple (magnitude, direction_degrees, type_string)
:param depth:
:param spectral_grid:
:param parameters:
:return:
"""
number_of_points = variance_density.shape[0]
roughness = np.empty((number_of_points))
for point_index in range(number_of_points):
wind_at_point = (wind[0][point_index], wind[1][point_index], wind[2])
if np.isnan(wind[0][point_index]):
roughness[point_index] = np.nan
else:
try:
roughness[point_index] = _roughness_estimate_point(
guess=guess[point_index],
variance_density=variance_density[point_index, :, :],
wind=wind_at_point,
depth=depth[point_index],
wind_source_term_function=wind_source_term_function,
tail_stress_parametrization_function=tail_stress_parametrization_function,
spectral_grid=spectral_grid,
parameters=parameters,
)
except:
roughness[point_index] = np.nan
return roughness
@jit(**numba_nocache)
def _wave_supported_stress(
variance_density,
wind,
depth,
roughness_length,
wind_source_term_function,
tail_stress_parametrization_function,
spectral_grid,
parameters,
) -> Tuple[np.typing.NDArray, np.typing.NDArray]:
"""
Calculate the wave supported wind stress.
:param variance_density: The wave spectrum in m**2/Hz/deg
:param wind: wind input tuple (magnitude, direction_degrees, type_string)
:param depth:
:param roughness_length: Surface roughness length in meter.
:param spectral_grid:
:param parameters:
:return:
"""
number_of_points = variance_density.shape[0]
magnitude = np.empty((number_of_points))
direction = np.empty((number_of_points))
for point_index in range(number_of_points):
wind_at_point = (wind[0][point_index], wind[1][point_index], wind[2])
(
magnitude[point_index],
direction[point_index],
) = _total_stress_point(
roughness_length[point_index],
variance_density[point_index, :, :],
wind=wind_at_point,
depth=depth[point_index],
wind_source_term_function=wind_source_term_function,
tail_stress_parametrization_function=tail_stress_parametrization_function,
spectral_grid=spectral_grid,
parameters=parameters,
)
return magnitude, direction
@jit(**numba_nocache)
def _tail_supported_stress(
variance_density,
wind,
depth,
roughness_length,
tail_stress_parametrization_function,
spectral_grid,
parameters,
) -> Tuple[np.typing.NDArray, np.typing.NDArray]:
"""
Calculate the wave supported wind stress.
:param variance_density: The wave spectrum in m**2/Hz/deg
:param wind: wind input tuple (magnitude, direction_degrees, type_string)
:param depth:
:param roughness_length: Surface roughness length in meter.
:param spectral_grid:
:param parameters:
:return:
"""
number_of_points = variance_density.shape[0]
stress_east = np.empty((number_of_points))
stress_north = np.empty((number_of_points))
for point_index in range(number_of_points):
wind_at_point = (wind[0][point_index], wind[1][point_index], wind[2])
(
stress_east[point_index],
stress_north[point_index],
) = tail_stress_parametrization_function(
variance_density[point_index, :, :],
wind_at_point,
depth[point_index],
roughness_length[point_index],
spectral_grid,
parameters,
)
magnitude = np.sqrt(stress_north**2 + stress_east**2)
direction = np.arctan2(stress_north, stress_east) % 360
return magnitude, direction
@jit(**numba_nocache)
def _stress_iteration_function(
log_roughness_length,
variance_density,
wind,
depth,
wind_source_term_function,
tail_stress_parametrization_function,
spectral_grid,
parameters,
work_array,
):
"""
The surface roughness is defined implicitly. We use a fixed-point iteration step
to solve for the roughness. This is the iteration function used in the
fixed point iteration.
:param roughness_length: Surface roughness length in meter.
:param variance_density: The wave spectrum in m**2/Hz/deg
:param wind: wind input tuple (magnitude, direction_degrees, type_string)
:param depth:
:param spectral_grid:
:param parameters:
:return:
"""
roughness_length = np.exp(log_roughness_length)
vonkarman_constant = parameters["vonkarman_constant"]
elevation = parameters["elevation"]
if wind[2] == "u10":
friction_velocity = (
wind[0] * vonkarman_constant / np.log(elevation / roughness_length)
)
else:
friction_velocity = wind[0]
# Get the wind input source term values
work_array = wind_source_term_function(
variance_density,
wind,
depth,
roughness_length,
spectral_grid,
parameters,
work_array,
)
total_stress_estimate, total_stress_direction_estimate = _total_stress_point(
roughness_length,
variance_density,
wind,
depth,
wind_source_term_function,
tail_stress_parametrization_function,
spectral_grid,
parameters,
work_array,
)
return parameters["air_density"] * friction_velocity**2 - total_stress_estimate
@jit(**numba_nocache)
def _total_stress_point(
roughness_length,
variance_density,
wind,
depth,
wind_source_term_function,
tail_stress_parametrization_function,
spectral_grid,
parameters,
work_array=None,
):
"""
:param roughness_length: Surface roughness length in meter.
:param variance_density: The wave spectrum in m**2/Hz/deg
:param wind: wind input tuple (magnitude, direction_degrees, type_string)
:param depth:
:param spectral_grid:
:param parameters:
:return:
"""
vonkarman_constant = parameters["vonkarman_constant"]
elevation = parameters["elevation"]
if wind[2] == "u10":
friction_velocity = (
wind[0] * vonkarman_constant / np.log(elevation / roughness_length)
)
else:
friction_velocity = wind[0]
if friction_velocity == 0.0:
return 0.0, np.nan
if np.isnan(friction_velocity):
return np.nan, np.nan
# Get the wind input source term values
work_array = wind_source_term_function(
variance_density,
wind,
depth,
roughness_length,
spectral_grid,
parameters,
work_array,
)
# Calculate the stress
stress_east, stress_north = _wave_supported_stress_point(
work_array,
depth,
spectral_grid,
variance_density,
wind,
roughness_length,
tail_stress_parametrization_function,
parameters,
)
viscous_stress = (
parameters["viscous_stress_parameter"]
* parameters["air_density"]
* friction_velocity
* parameters["air_viscosity"]
/ vonkarman_constant
/ roughness_length
)
wind_direction_radian = wind[1] * np.pi / 180
stress_north += viscous_stress * np.sin(wind_direction_radian)
stress_east += viscous_stress * np.cos(wind_direction_radian)
stress_direction = (np.arctan2(stress_north, stress_east) * 180 / np.pi) % 360
stress_magnitude = np.sqrt(stress_north**2 + stress_east**2)
return stress_magnitude, stress_direction