Module ocean_science_utilities.wavephysics.balance.generation

Expand source code
import numba  # type: ignore
import numpy as np
import xarray

from typing import Callable, Dict, Literal, Mapping, Optional, Tuple


from ocean_science_utilities.wavephysics.balance._numba_settings import (
    numba_default,
    numba_nocache_parallel,
)
from ocean_science_utilities.wavephysics.balance.source_term import SourceTerm
from ocean_science_utilities.wavephysics.balance.stress import (
    _roughness_estimate,
    _wave_supported_stress,
    _tail_supported_stress,
)
from ocean_science_utilities.wavespectra.operations import numba_integrate_spectral_data
from ocean_science_utilities.wavespectra.spectrum import FrequencyDirectionSpectrum

TWindInputType = Literal["u10", "friction_velocity", "ustar"]


class WindGeneration(SourceTerm):
    name = "base"

    def __init__(self, parmaters):
        super(WindGeneration, self).__init__(parmaters)
        self._wind_source_term_function = None
        self._tail_stress_parametrization_function = _tail_stress_parametrization_none

    def update_parameters(self, parameters: Mapping):
        for key in parameters:
            if key in self._parameters:
                self._parameters[key] = parameters[key]

    def rate(
        self,
        spectrum: FrequencyDirectionSpectrum,
        speed: xarray.DataArray,
        direction: xarray.DataArray,
        roughness_length: Optional[xarray.DataArray] = None,
        wind_speed_input_type: TWindInputType = "u10",
        **kwargs,
    ) -> xarray.DataArray:
        if roughness_length is None:
            roughness_length = self.roughness(
                speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
            )

        wind = (speed.values, direction.values, wind_speed_input_type)
        wind_input = _wind_generation(
            spectrum.variance_density.values,
            wind=wind,
            depth=spectrum.depth.values,
            roughness_length=roughness_length.values,
            wind_source_term_function=self._wind_source_term_function,
            spectral_grid=self.spectral_grid(spectrum),
            parameters=self.parameters,
        )
        return xarray.DataArray(
            data=wind_input, dims=spectrum.dims, coords=spectrum.coords()
        )

    def bulk_rate(
        self,
        spectrum: FrequencyDirectionSpectrum,
        speed: xarray.DataArray,
        direction: xarray.DataArray,
        roughness_length: Optional[xarray.DataArray] = None,
        wind_speed_input_type: TWindInputType = "u10",
    ) -> xarray.DataArray:
        wind = (speed.values, direction.values, wind_speed_input_type)

        if roughness_length is None:
            roughness_length = self.roughness(
                speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
            )

        wind_input = _bulk_wind_generation(
            spectrum.variance_density.values,
            wind,
            spectrum.depth.values,
            roughness_length.values,
            self._wind_source_term_function,
            self.spectral_grid(spectrum),
            self.parameters,
        )
        return xarray.DataArray(
            data=wind_input,
            dims=spectrum.dims_space_time,
            coords=spectrum.coords_space_time,
        )

    def roughness(
        self,
        speed: xarray.DataArray,
        direction: xarray.DataArray,
        spectrum: FrequencyDirectionSpectrum,
        roughness_length_guess: Optional[xarray.DataArray] = None,
        wind_speed_input_type: TWindInputType = "u10",
    ) -> xarray.DataArray:
        if roughness_length_guess is None:
            roughness_length_guess = xarray.zeros_like(speed) - 1

        wind = (speed.values, direction.values, wind_speed_input_type)

        rougness = _roughness_estimate(
            guess=roughness_length_guess.values,
            variance_density=spectrum.variance_density.values,
            wind=wind,
            depth=spectrum.depth.values,
            wind_source_term_function=self._wind_source_term_function,
            tail_stress_parametrization_function=(
                self._tail_stress_parametrization_function
            ),
            spectral_grid=self.spectral_grid(spectrum),
            parameters=self.parameters,
        )
        return xarray.DataArray(
            data=rougness,
            dims=spectrum.dims_space_time,
            coords=spectrum.coords_space_time,
        )

    def stress(
        self,
        spectrum: FrequencyDirectionSpectrum,
        speed: xarray.DataArray,
        direction: xarray.DataArray,
        roughness_length: Optional[xarray.DataArray] = None,
        wind_speed_input_type: TWindInputType = "u10",
    ) -> xarray.Dataset:
        if roughness_length is None:
            roughness_length = self.roughness(
                speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
            )

        wind = (speed.values, direction.values, wind_speed_input_type)

        stress = _wave_supported_stress(
            variance_density=spectrum.variance_density.values,
            wind=wind,
            depth=spectrum.depth.values,
            roughness_length=roughness_length.values,
            wind_source_term_function=self._wind_source_term_function,
            tail_stress_parametrization_function=(
                self._tail_stress_parametrization_function
            ),
            spectral_grid=self.spectral_grid(spectrum),
            parameters=self.parameters,
        )

        return xarray.Dataset(
            data_vars={
                "stress": (spectrum.dims_space_time, stress[0]),
                "direction": (spectrum.dims_space_time, stress[1]),
            },
            coords=spectrum.coords_space_time,
        )

    def friction_velocity(
        self,
        spectrum: FrequencyDirectionSpectrum,
        u10: xarray.DataArray,
        direction: xarray.DataArray,
        roughness_length: Optional[xarray.DataArray] = None,
    ) -> xarray.DataArray:
        stress = self.stress(spectrum, u10, direction, roughness_length, "u10")
        return np.sqrt(stress["stress"] / self.parameters["air_density"])

    def drag(
        self,
        spectrum: FrequencyDirectionSpectrum,
        speed: xarray.DataArray,
        direction: xarray.DataArray,
        roughness_length: Optional[xarray.DataArray] = None,
        wind_speed_input_type: TWindInputType = "u10",
    ):
        if wind_speed_input_type == "u10":
            u10 = speed
            u_star = self.friction_velocity(
                spectrum, speed, direction, roughness_length
            )
        else:
            u_star = speed
            roughness = self.roughness(
                speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
            )
            u10 = (
                u_star * self.parameters["vonkarman_constant"] * np.log(10 / roughness)
            )

        return xarray.DataArray(data=u_star.values**2 / u10.values**2)

    def tail_stress(
        self,
        spectrum: FrequencyDirectionSpectrum,
        speed: xarray.DataArray,
        direction: xarray.DataArray,
        roughness_length: Optional[xarray.DataArray] = None,
        wind_speed_input_type: TWindInputType = "u10",
    ) -> xarray.Dataset:
        if roughness_length is None:
            roughness_length = self.roughness(
                speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
            )

        wind = (speed.values, direction.values, wind_speed_input_type)
        stress = _tail_supported_stress(
            variance_density=spectrum.variance_density.values,
            wind=wind,
            depth=spectrum.depth.values,
            roughness_length=roughness_length.values,
            tail_stress_parametrization_function=(
                self._tail_stress_parametrization_function
            ),
            spectral_grid=self.spectral_grid(spectrum),
            parameters=self.parameters,
        )

        return xarray.Dataset(
            data_vars={
                "stress": (spectrum.dims_space_time, stress[0]),
                "direction": (spectrum.dims_space_time, stress[1]),
            },
            coords=spectrum.coords_space_time,
        )


# ----------------------------------------------------------------------------------------------------------------------
# Apply to all spatial points
# ----------------------------------------------------------------------------------------------------------------------


@numba.jit(**numba_nocache_parallel)
def _wind_generation(
    variance_density: np.ndarray,
    wind: Tuple[np.ndarray, np.ndarray, TWindInputType],
    depth: np.ndarray,
    roughness_length: np.ndarray,
    wind_source_term_function: Callable,
    spectral_grid: Dict[str, np.ndarray],
    parameters: Mapping,
) -> np.typing.NDArray:
    (
        number_of_points,
        number_of_frequencies,
        number_of_directions,
    ) = variance_density.shape

    generation = np.empty(
        (number_of_points, number_of_frequencies, number_of_directions)
    )
    for point_index in range(number_of_points):
        wind_at_point = (wind[0][point_index], wind[1][point_index], wind[2])
        wind_generation = wind_source_term_function(
            variance_density[point_index, :, :],
            wind_at_point,
            depth[point_index],
            roughness_length[point_index],
            spectral_grid,
            parameters,
        )
        generation[point_index, :, :] = wind_generation
    return generation


@numba.jit(**numba_nocache_parallel)
def _bulk_wind_generation(
    variance_density: np.ndarray,
    wind: Tuple[np.ndarray, np.ndarray, str],
    depth: np.ndarray,
    roughness_length: np.ndarray,
    wind_source_term_function: Callable,
    spectral_grid: Dict[str, np.ndarray],
    parameters: Mapping,
) -> np.typing.NDArray:
    number_of_points = variance_density.shape[0]
    generation = np.empty((number_of_points))

    for point_index in numba.prange(number_of_points):
        wind_at_point = (wind[0][point_index], wind[1][point_index], wind[2])
        wind_generation = wind_source_term_function(
            variance_density=variance_density[point_index, :, :],
            wind=wind_at_point,
            depth=depth[point_index],
            roughness_length=roughness_length[point_index],
            spectral_grid=spectral_grid,
            parameters=parameters,
        )
        generation[point_index] = numba_integrate_spectral_data(
            wind_generation, spectral_grid
        )
    return generation


# ----------------------------------------------------------------------------------------------------------------------
# Helper functions
# ----------------------------------------------------------------------------------------------------------------------


@numba.jit(**numba_default)
def _spectral_grid(
    radian_frequency: np.ndarray,
    radian_direction: np.ndarray,
    frequency_step: np.ndarray,
    direction_step: np.ndarray,
) -> Dict[str, np.ndarray]:
    return {
        "radian_frequency": radian_frequency,
        "radian_direction": radian_direction,
        "frequency_step": frequency_step,
        "direction_step": direction_step,
    }


def _numba_parameters(**kwargs) -> numba.typed.Dict:
    _dict = numba.typed.Dict.empty(
        key_type=numba.types.unicode_type, value_type=numba.types.float64
    )
    for key in kwargs:
        _dict[key] = kwargs[key]

    return _dict


@numba.jit(**numba_default)
def _tail_stress_parametrization_none(
    variance_density: np.ndarray,
    wind: Tuple[np.ndarray, np.ndarray, str],
    depth: np.ndarray,
    roughness_length: np.ndarray,
    wind_source_term_function: Callable,
    spectral_grid: Dict[str, np.ndarray],
    parameters: Mapping,
) -> float:
    return 0.0

Classes

class WindGeneration (parmaters)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class WindGeneration(SourceTerm):
    name = "base"

    def __init__(self, parmaters):
        super(WindGeneration, self).__init__(parmaters)
        self._wind_source_term_function = None
        self._tail_stress_parametrization_function = _tail_stress_parametrization_none

    def update_parameters(self, parameters: Mapping):
        for key in parameters:
            if key in self._parameters:
                self._parameters[key] = parameters[key]

    def rate(
        self,
        spectrum: FrequencyDirectionSpectrum,
        speed: xarray.DataArray,
        direction: xarray.DataArray,
        roughness_length: Optional[xarray.DataArray] = None,
        wind_speed_input_type: TWindInputType = "u10",
        **kwargs,
    ) -> xarray.DataArray:
        if roughness_length is None:
            roughness_length = self.roughness(
                speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
            )

        wind = (speed.values, direction.values, wind_speed_input_type)
        wind_input = _wind_generation(
            spectrum.variance_density.values,
            wind=wind,
            depth=spectrum.depth.values,
            roughness_length=roughness_length.values,
            wind_source_term_function=self._wind_source_term_function,
            spectral_grid=self.spectral_grid(spectrum),
            parameters=self.parameters,
        )
        return xarray.DataArray(
            data=wind_input, dims=spectrum.dims, coords=spectrum.coords()
        )

    def bulk_rate(
        self,
        spectrum: FrequencyDirectionSpectrum,
        speed: xarray.DataArray,
        direction: xarray.DataArray,
        roughness_length: Optional[xarray.DataArray] = None,
        wind_speed_input_type: TWindInputType = "u10",
    ) -> xarray.DataArray:
        wind = (speed.values, direction.values, wind_speed_input_type)

        if roughness_length is None:
            roughness_length = self.roughness(
                speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
            )

        wind_input = _bulk_wind_generation(
            spectrum.variance_density.values,
            wind,
            spectrum.depth.values,
            roughness_length.values,
            self._wind_source_term_function,
            self.spectral_grid(spectrum),
            self.parameters,
        )
        return xarray.DataArray(
            data=wind_input,
            dims=spectrum.dims_space_time,
            coords=spectrum.coords_space_time,
        )

    def roughness(
        self,
        speed: xarray.DataArray,
        direction: xarray.DataArray,
        spectrum: FrequencyDirectionSpectrum,
        roughness_length_guess: Optional[xarray.DataArray] = None,
        wind_speed_input_type: TWindInputType = "u10",
    ) -> xarray.DataArray:
        if roughness_length_guess is None:
            roughness_length_guess = xarray.zeros_like(speed) - 1

        wind = (speed.values, direction.values, wind_speed_input_type)

        rougness = _roughness_estimate(
            guess=roughness_length_guess.values,
            variance_density=spectrum.variance_density.values,
            wind=wind,
            depth=spectrum.depth.values,
            wind_source_term_function=self._wind_source_term_function,
            tail_stress_parametrization_function=(
                self._tail_stress_parametrization_function
            ),
            spectral_grid=self.spectral_grid(spectrum),
            parameters=self.parameters,
        )
        return xarray.DataArray(
            data=rougness,
            dims=spectrum.dims_space_time,
            coords=spectrum.coords_space_time,
        )

    def stress(
        self,
        spectrum: FrequencyDirectionSpectrum,
        speed: xarray.DataArray,
        direction: xarray.DataArray,
        roughness_length: Optional[xarray.DataArray] = None,
        wind_speed_input_type: TWindInputType = "u10",
    ) -> xarray.Dataset:
        if roughness_length is None:
            roughness_length = self.roughness(
                speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
            )

        wind = (speed.values, direction.values, wind_speed_input_type)

        stress = _wave_supported_stress(
            variance_density=spectrum.variance_density.values,
            wind=wind,
            depth=spectrum.depth.values,
            roughness_length=roughness_length.values,
            wind_source_term_function=self._wind_source_term_function,
            tail_stress_parametrization_function=(
                self._tail_stress_parametrization_function
            ),
            spectral_grid=self.spectral_grid(spectrum),
            parameters=self.parameters,
        )

        return xarray.Dataset(
            data_vars={
                "stress": (spectrum.dims_space_time, stress[0]),
                "direction": (spectrum.dims_space_time, stress[1]),
            },
            coords=spectrum.coords_space_time,
        )

    def friction_velocity(
        self,
        spectrum: FrequencyDirectionSpectrum,
        u10: xarray.DataArray,
        direction: xarray.DataArray,
        roughness_length: Optional[xarray.DataArray] = None,
    ) -> xarray.DataArray:
        stress = self.stress(spectrum, u10, direction, roughness_length, "u10")
        return np.sqrt(stress["stress"] / self.parameters["air_density"])

    def drag(
        self,
        spectrum: FrequencyDirectionSpectrum,
        speed: xarray.DataArray,
        direction: xarray.DataArray,
        roughness_length: Optional[xarray.DataArray] = None,
        wind_speed_input_type: TWindInputType = "u10",
    ):
        if wind_speed_input_type == "u10":
            u10 = speed
            u_star = self.friction_velocity(
                spectrum, speed, direction, roughness_length
            )
        else:
            u_star = speed
            roughness = self.roughness(
                speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
            )
            u10 = (
                u_star * self.parameters["vonkarman_constant"] * np.log(10 / roughness)
            )

        return xarray.DataArray(data=u_star.values**2 / u10.values**2)

    def tail_stress(
        self,
        spectrum: FrequencyDirectionSpectrum,
        speed: xarray.DataArray,
        direction: xarray.DataArray,
        roughness_length: Optional[xarray.DataArray] = None,
        wind_speed_input_type: TWindInputType = "u10",
    ) -> xarray.Dataset:
        if roughness_length is None:
            roughness_length = self.roughness(
                speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
            )

        wind = (speed.values, direction.values, wind_speed_input_type)
        stress = _tail_supported_stress(
            variance_density=spectrum.variance_density.values,
            wind=wind,
            depth=spectrum.depth.values,
            roughness_length=roughness_length.values,
            tail_stress_parametrization_function=(
                self._tail_stress_parametrization_function
            ),
            spectral_grid=self.spectral_grid(spectrum),
            parameters=self.parameters,
        )

        return xarray.Dataset(
            data_vars={
                "stress": (spectrum.dims_space_time, stress[0]),
                "direction": (spectrum.dims_space_time, stress[1]),
            },
            coords=spectrum.coords_space_time,
        )

Ancestors

Subclasses

Class variables

var name

Methods

def bulk_rate(self, spectrum: FrequencyDirectionSpectrum, speed: xarray.core.dataarray.DataArray, direction: xarray.core.dataarray.DataArray, roughness_length: Optional[xarray.core.dataarray.DataArray] = None, wind_speed_input_type: Literal['u10', 'friction_velocity', 'ustar'] = 'u10') ‑> xarray.core.dataarray.DataArray
Expand source code
def bulk_rate(
    self,
    spectrum: FrequencyDirectionSpectrum,
    speed: xarray.DataArray,
    direction: xarray.DataArray,
    roughness_length: Optional[xarray.DataArray] = None,
    wind_speed_input_type: TWindInputType = "u10",
) -> xarray.DataArray:
    wind = (speed.values, direction.values, wind_speed_input_type)

    if roughness_length is None:
        roughness_length = self.roughness(
            speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
        )

    wind_input = _bulk_wind_generation(
        spectrum.variance_density.values,
        wind,
        spectrum.depth.values,
        roughness_length.values,
        self._wind_source_term_function,
        self.spectral_grid(spectrum),
        self.parameters,
    )
    return xarray.DataArray(
        data=wind_input,
        dims=spectrum.dims_space_time,
        coords=spectrum.coords_space_time,
    )
def drag(self, spectrum: FrequencyDirectionSpectrum, speed: xarray.core.dataarray.DataArray, direction: xarray.core.dataarray.DataArray, roughness_length: Optional[xarray.core.dataarray.DataArray] = None, wind_speed_input_type: Literal['u10', 'friction_velocity', 'ustar'] = 'u10')
Expand source code
def drag(
    self,
    spectrum: FrequencyDirectionSpectrum,
    speed: xarray.DataArray,
    direction: xarray.DataArray,
    roughness_length: Optional[xarray.DataArray] = None,
    wind_speed_input_type: TWindInputType = "u10",
):
    if wind_speed_input_type == "u10":
        u10 = speed
        u_star = self.friction_velocity(
            spectrum, speed, direction, roughness_length
        )
    else:
        u_star = speed
        roughness = self.roughness(
            speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
        )
        u10 = (
            u_star * self.parameters["vonkarman_constant"] * np.log(10 / roughness)
        )

    return xarray.DataArray(data=u_star.values**2 / u10.values**2)
def friction_velocity(self, spectrum: FrequencyDirectionSpectrum, u10: xarray.core.dataarray.DataArray, direction: xarray.core.dataarray.DataArray, roughness_length: Optional[xarray.core.dataarray.DataArray] = None) ‑> xarray.core.dataarray.DataArray
Expand source code
def friction_velocity(
    self,
    spectrum: FrequencyDirectionSpectrum,
    u10: xarray.DataArray,
    direction: xarray.DataArray,
    roughness_length: Optional[xarray.DataArray] = None,
) -> xarray.DataArray:
    stress = self.stress(spectrum, u10, direction, roughness_length, "u10")
    return np.sqrt(stress["stress"] / self.parameters["air_density"])
def rate(self, spectrum: FrequencyDirectionSpectrum, speed: xarray.core.dataarray.DataArray, direction: xarray.core.dataarray.DataArray, roughness_length: Optional[xarray.core.dataarray.DataArray] = None, wind_speed_input_type: Literal['u10', 'friction_velocity', 'ustar'] = 'u10', **kwargs) ‑> xarray.core.dataarray.DataArray
Expand source code
def rate(
    self,
    spectrum: FrequencyDirectionSpectrum,
    speed: xarray.DataArray,
    direction: xarray.DataArray,
    roughness_length: Optional[xarray.DataArray] = None,
    wind_speed_input_type: TWindInputType = "u10",
    **kwargs,
) -> xarray.DataArray:
    if roughness_length is None:
        roughness_length = self.roughness(
            speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
        )

    wind = (speed.values, direction.values, wind_speed_input_type)
    wind_input = _wind_generation(
        spectrum.variance_density.values,
        wind=wind,
        depth=spectrum.depth.values,
        roughness_length=roughness_length.values,
        wind_source_term_function=self._wind_source_term_function,
        spectral_grid=self.spectral_grid(spectrum),
        parameters=self.parameters,
    )
    return xarray.DataArray(
        data=wind_input, dims=spectrum.dims, coords=spectrum.coords()
    )
def roughness(self, speed: xarray.core.dataarray.DataArray, direction: xarray.core.dataarray.DataArray, spectrum: FrequencyDirectionSpectrum, roughness_length_guess: Optional[xarray.core.dataarray.DataArray] = None, wind_speed_input_type: Literal['u10', 'friction_velocity', 'ustar'] = 'u10') ‑> xarray.core.dataarray.DataArray
Expand source code
def roughness(
    self,
    speed: xarray.DataArray,
    direction: xarray.DataArray,
    spectrum: FrequencyDirectionSpectrum,
    roughness_length_guess: Optional[xarray.DataArray] = None,
    wind_speed_input_type: TWindInputType = "u10",
) -> xarray.DataArray:
    if roughness_length_guess is None:
        roughness_length_guess = xarray.zeros_like(speed) - 1

    wind = (speed.values, direction.values, wind_speed_input_type)

    rougness = _roughness_estimate(
        guess=roughness_length_guess.values,
        variance_density=spectrum.variance_density.values,
        wind=wind,
        depth=spectrum.depth.values,
        wind_source_term_function=self._wind_source_term_function,
        tail_stress_parametrization_function=(
            self._tail_stress_parametrization_function
        ),
        spectral_grid=self.spectral_grid(spectrum),
        parameters=self.parameters,
    )
    return xarray.DataArray(
        data=rougness,
        dims=spectrum.dims_space_time,
        coords=spectrum.coords_space_time,
    )
def stress(self, spectrum: FrequencyDirectionSpectrum, speed: xarray.core.dataarray.DataArray, direction: xarray.core.dataarray.DataArray, roughness_length: Optional[xarray.core.dataarray.DataArray] = None, wind_speed_input_type: Literal['u10', 'friction_velocity', 'ustar'] = 'u10') ‑> xarray.core.dataset.Dataset
Expand source code
def stress(
    self,
    spectrum: FrequencyDirectionSpectrum,
    speed: xarray.DataArray,
    direction: xarray.DataArray,
    roughness_length: Optional[xarray.DataArray] = None,
    wind_speed_input_type: TWindInputType = "u10",
) -> xarray.Dataset:
    if roughness_length is None:
        roughness_length = self.roughness(
            speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
        )

    wind = (speed.values, direction.values, wind_speed_input_type)

    stress = _wave_supported_stress(
        variance_density=spectrum.variance_density.values,
        wind=wind,
        depth=spectrum.depth.values,
        roughness_length=roughness_length.values,
        wind_source_term_function=self._wind_source_term_function,
        tail_stress_parametrization_function=(
            self._tail_stress_parametrization_function
        ),
        spectral_grid=self.spectral_grid(spectrum),
        parameters=self.parameters,
    )

    return xarray.Dataset(
        data_vars={
            "stress": (spectrum.dims_space_time, stress[0]),
            "direction": (spectrum.dims_space_time, stress[1]),
        },
        coords=spectrum.coords_space_time,
    )
def tail_stress(self, spectrum: FrequencyDirectionSpectrum, speed: xarray.core.dataarray.DataArray, direction: xarray.core.dataarray.DataArray, roughness_length: Optional[xarray.core.dataarray.DataArray] = None, wind_speed_input_type: Literal['u10', 'friction_velocity', 'ustar'] = 'u10') ‑> xarray.core.dataset.Dataset
Expand source code
def tail_stress(
    self,
    spectrum: FrequencyDirectionSpectrum,
    speed: xarray.DataArray,
    direction: xarray.DataArray,
    roughness_length: Optional[xarray.DataArray] = None,
    wind_speed_input_type: TWindInputType = "u10",
) -> xarray.Dataset:
    if roughness_length is None:
        roughness_length = self.roughness(
            speed, direction, spectrum, wind_speed_input_type=wind_speed_input_type
        )

    wind = (speed.values, direction.values, wind_speed_input_type)
    stress = _tail_supported_stress(
        variance_density=spectrum.variance_density.values,
        wind=wind,
        depth=spectrum.depth.values,
        roughness_length=roughness_length.values,
        tail_stress_parametrization_function=(
            self._tail_stress_parametrization_function
        ),
        spectral_grid=self.spectral_grid(spectrum),
        parameters=self.parameters,
    )

    return xarray.Dataset(
        data_vars={
            "stress": (spectrum.dims_space_time, stress[0]),
            "direction": (spectrum.dims_space_time, stress[1]),
        },
        coords=spectrum.coords_space_time,
    )
def update_parameters(self, parameters: Mapping)
Expand source code
def update_parameters(self, parameters: Mapping):
    for key in parameters:
        if key in self._parameters:
            self._parameters[key] = parameters[key]