Module ocean_science_utilities.wavephysics.windestimate

Contents: Wind Estimator

Copyright (C) 2022 Sofar Ocean Technologies

Authors: Pieter Bart Smit

Expand source code
"""
Contents: Wind Estimator

Copyright (C) 2022
Sofar Ocean Technologies

Authors: Pieter Bart Smit
"""
import numpy as np
import xarray

from typing import Literal, Tuple, Union

from ocean_science_utilities.wavephysics.balance.wind_inversion import (
    windspeed_and_direction_from_spectra,
)
from ocean_science_utilities.wavephysics.balance.balance import SourceTermBalance
from ocean_science_utilities.wavephysics.roughness import charnock_roughness_length
from ocean_science_utilities.wavespectra.spectrum import (
    FrequencySpectrum,
    FrequencyDirectionSpectrum,
)


_methods = Literal["peak", "mean"]
_charnock_parametrization = Literal["constant", "voermans15", "voermans16"]
_direction_convention = Literal[
    "coming_from_clockwise_north", "going_to_counter_clockwise_east"
]


def friction_velocity(
    spectrum: FrequencySpectrum,
    method: _methods = "peak",
    fmax: float = 0.5,
    power: float = 4,
    directional_spreading_constant: float = 2.5,
    beta: float = 0.012,
    grav: float = 9.81,
    number_of_bins: int = 20,
) -> xarray.Dataset:
    """

    :param spectrum:
    :param method:
    :param fmax:
    :param power:
    :param directional_spreading_constant:
    :param beta:
    :param grav:
    :param number_of_bins:
    :return:
    """
    e, a1, b1 = equilibrium_range_values(
        spectrum,
        method=method,
        fmax=fmax,
        power=power,
        number_of_bins=number_of_bins,
    )

    emean = 8.0 * np.pi**3 * e

    # Get friction velocity from spectrum
    friction_velocity_estimate = (
        emean / grav / directional_spreading_constant / beta / 4
    )

    # Estimate direction from tail
    direction = (180.0 / np.pi * np.arctan2(b1, a1)) % 360
    coords = {x: spectrum.dataset[x].values for x in spectrum.dims_space_time}
    return xarray.Dataset(
        data_vars={
            "friction_velocity": (spectrum.dims_space_time, friction_velocity_estimate),
            "direction": (spectrum.dims_space_time, direction),
        },
        coords=coords,
    )


def estimate_u10_from_spectrum(
    spectrum: Union[FrequencySpectrum, FrequencyDirectionSpectrum],
    method: _methods = "peak",
    fmax=0.5,
    power=4,
    directional_spreading_constant=2.5,
    phillips_constant_beta=0.012,
    vonkarman_constant=0.4,
    grav=9.81,
    number_of_bins=20,
    direction_convention: _direction_convention = "going_to_counter_clockwise_east",
    **kwargs,
) -> xarray.Dataset:
    #
    # =========================================================================
    # Required Input
    # =========================================================================
    #
    # f              :: frequencies (in Hz)
    # E              :: Variance densities (in m^2 / Hz )
    #
    # =========================================================================
    # Output
    # =========================================================================
    #
    # U10            :: in m/s
    # Direction      :: in degrees clockwise from North (where wind is *coming from)
    #
    # =========================================================================
    # Named Keywords (parameters to inversion algorithm)
    # =========================================================================
    # Npower = 4     :: exponent for the fitted f^-N spectral tail
    # I      = 2.5   :: Philips Directional Constant
    # beta   = 0.012 :: Equilibrium Constant
    # Kapppa = 0.4   :: Von Karman constant
    # Alpha  = 0.012 :: Constant in estimating z0 from u* in Charnock relation
    # grav   = 9.81  :: Gravitational acceleration
    #
    # =========================================================================
    # Algorithm
    # =========================================================================
    #
    # 1) Find the part of the spectrum that best fits a f^-4 shape
    # 2) Estimate the Phillips equilibrium level "Emean" over that range
    # 3) Use Emean to estimate Wind speed (using Charnock and LogLaw)
    # 4) Calculate mean direction over equilibrium range

    if isinstance(spectrum, FrequencyDirectionSpectrum):
        spectrum_1d: FrequencySpectrum = spectrum.as_frequency_spectrum()
    else:
        spectrum_1d = spectrum

    # Get friction velocity from spectrum
    dataset = friction_velocity(
        spectrum_1d,
        method,
        fmax,
        power,
        directional_spreading_constant,
        phillips_constant_beta,
        grav,
        number_of_bins,
    )

    # Find z0 from Charnock Relation
    z0 = charnock_roughness_length(dataset.friction_velocity, **kwargs)

    if direction_convention == "coming_from_clockwise_north":
        dataset["direction"] = (270.0 - dataset["direction"]) % 360
    elif direction_convention == "going_to_counter_clockwise_east":
        pass
    else:
        raise ValueError(f"Unknown direectional convention: {direction_convention}")

    # Get the wind speed at U10 from loglaw
    return dataset.assign(
        {"u10": dataset.friction_velocity / vonkarman_constant * np.log(10.0 / z0)}
    )


def equilibrium_range_values(
    spectrum: FrequencySpectrum,
    method: _methods,
    fmax=1.25,
    power=4,
    number_of_bins=20,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """

    :param spectrum:
    :param method:
    :param fmax:
    :param power:
    :param number_of_bins:
    :return:
    """

    if method == "peak":
        scaled_spec = spectrum.variance_density * spectrum.frequency**power
        scaled_spec = scaled_spec.fillna(0)
        indices = scaled_spec.argmax(dim="frequency")
        a1 = spectrum.a1.isel({"frequency": indices}).values
        b1 = spectrum.b1.isel({"frequency": indices}).values
        e = scaled_spec.isel({"frequency": indices}).values
        return e, a1, b1

    elif method == "mean":
        scaled_spec = spectrum.variance_density * spectrum.frequency**power

        # Find fmin/fmax
        fmin = 0
        i_min = np.argmin(np.abs(spectrum.frequency.values - fmin), axis=-1)
        i_max = np.argmin(np.abs(spectrum.frequency.values - fmax), axis=-1)
        nf = spectrum.number_of_frequencies

        i_max = i_max + 1 - number_of_bins
        i_max = np.max((i_min + 1, i_max))
        i_max = np.min((i_max, nf - number_of_bins))

        i_counter = 0
        shape = list(spectrum.shape())
        shape[-1] = i_max - i_min
        variance = np.zeros(shape) + np.inf

        #
        # Calculate the variance with respect to a running average mean of numberOfBins
        #
        for iFreq in range(i_min, i_max):
            #
            # Ensure we do not go out of bounds
            iiu = np.min((iFreq + number_of_bins, nf))

            # Ensure there are no 0 contributions (essentially no data)
            m = scaled_spec[..., iFreq:iiu].mean(dim="frequency")
            variance[..., i_counter] = ((scaled_spec[..., iFreq:iiu] - m) ** 2).mean(
                dim="frequency"
            ) / (m**2)
            i_counter = i_counter + 1
            #
        #
        i_min_variance = np.argmin(variance, axis=-1) + i_min

        e = np.zeros(i_min_variance.shape)
        a1 = np.zeros(i_min_variance.shape)
        b1 = np.zeros(i_min_variance.shape)

        index = np.array(list(range(0, spectrum.number_of_spectra)))
        unraveled_index = np.unravel_index(index, i_min_variance.shape)
        i_min_variance = i_min_variance.flatten()

        for ii in range(0, number_of_bins):
            jj = np.clip(i_min_variance + ii, a_min=0, a_max=nf - 1 - number_of_bins)

            indexer = tuple([ind for ind in unraveled_index] + [jj])

            e[unraveled_index] += scaled_spec.values[indexer]
            a1[unraveled_index] += spectrum.a1.values[indexer]
            b1[unraveled_index] += spectrum.b1.values[indexer]
        fac = 1 / number_of_bins
        e *= fac
        a1 *= fac
        b1 *= fac

        return e, a1, b1


def estimate_u10_from_source_terms(
    spectrum: FrequencyDirectionSpectrum,
    balance: SourceTermBalance,
    time_derivative_spectrum=None,
    direction_iteration=False,
    **kwargs,
) -> xarray.Dataset:
    # wind_direction = balance.dissipation.mean_direction_degrees(
    #     spectrum
    # )
    # dissipation_bulk = balance.dissipation.bulk_rate(spectrum)

    # Our first guess is the wind estimate based on the peak
    # equilibrium range approximation
    guess = estimate_u10_from_spectrum(
        spectrum, "peak", direction_convention="going_to_counter_clockwise_east"
    )["u10"]

    u10_estimate = windspeed_and_direction_from_spectra(
        balance=balance,
        guess_u10=guess,
        spectrum=spectrum,
        time_derivative_spectrum=time_derivative_spectrum,
        direction_iteration=direction_iteration,
    )

    return u10_estimate

Functions

def equilibrium_range_values(spectrum: FrequencySpectrum, method: Literal['peak', 'mean'], fmax=1.25, power=4, number_of_bins=20) ‑> Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]

:param spectrum: :param method: :param fmax: :param power: :param number_of_bins: :return:

Expand source code
def equilibrium_range_values(
    spectrum: FrequencySpectrum,
    method: _methods,
    fmax=1.25,
    power=4,
    number_of_bins=20,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """

    :param spectrum:
    :param method:
    :param fmax:
    :param power:
    :param number_of_bins:
    :return:
    """

    if method == "peak":
        scaled_spec = spectrum.variance_density * spectrum.frequency**power
        scaled_spec = scaled_spec.fillna(0)
        indices = scaled_spec.argmax(dim="frequency")
        a1 = spectrum.a1.isel({"frequency": indices}).values
        b1 = spectrum.b1.isel({"frequency": indices}).values
        e = scaled_spec.isel({"frequency": indices}).values
        return e, a1, b1

    elif method == "mean":
        scaled_spec = spectrum.variance_density * spectrum.frequency**power

        # Find fmin/fmax
        fmin = 0
        i_min = np.argmin(np.abs(spectrum.frequency.values - fmin), axis=-1)
        i_max = np.argmin(np.abs(spectrum.frequency.values - fmax), axis=-1)
        nf = spectrum.number_of_frequencies

        i_max = i_max + 1 - number_of_bins
        i_max = np.max((i_min + 1, i_max))
        i_max = np.min((i_max, nf - number_of_bins))

        i_counter = 0
        shape = list(spectrum.shape())
        shape[-1] = i_max - i_min
        variance = np.zeros(shape) + np.inf

        #
        # Calculate the variance with respect to a running average mean of numberOfBins
        #
        for iFreq in range(i_min, i_max):
            #
            # Ensure we do not go out of bounds
            iiu = np.min((iFreq + number_of_bins, nf))

            # Ensure there are no 0 contributions (essentially no data)
            m = scaled_spec[..., iFreq:iiu].mean(dim="frequency")
            variance[..., i_counter] = ((scaled_spec[..., iFreq:iiu] - m) ** 2).mean(
                dim="frequency"
            ) / (m**2)
            i_counter = i_counter + 1
            #
        #
        i_min_variance = np.argmin(variance, axis=-1) + i_min

        e = np.zeros(i_min_variance.shape)
        a1 = np.zeros(i_min_variance.shape)
        b1 = np.zeros(i_min_variance.shape)

        index = np.array(list(range(0, spectrum.number_of_spectra)))
        unraveled_index = np.unravel_index(index, i_min_variance.shape)
        i_min_variance = i_min_variance.flatten()

        for ii in range(0, number_of_bins):
            jj = np.clip(i_min_variance + ii, a_min=0, a_max=nf - 1 - number_of_bins)

            indexer = tuple([ind for ind in unraveled_index] + [jj])

            e[unraveled_index] += scaled_spec.values[indexer]
            a1[unraveled_index] += spectrum.a1.values[indexer]
            b1[unraveled_index] += spectrum.b1.values[indexer]
        fac = 1 / number_of_bins
        e *= fac
        a1 *= fac
        b1 *= fac

        return e, a1, b1
def estimate_u10_from_source_terms(spectrum: FrequencyDirectionSpectrum, balance: SourceTermBalance, time_derivative_spectrum=None, direction_iteration=False, **kwargs) ‑> xarray.core.dataset.Dataset
Expand source code
def estimate_u10_from_source_terms(
    spectrum: FrequencyDirectionSpectrum,
    balance: SourceTermBalance,
    time_derivative_spectrum=None,
    direction_iteration=False,
    **kwargs,
) -> xarray.Dataset:
    # wind_direction = balance.dissipation.mean_direction_degrees(
    #     spectrum
    # )
    # dissipation_bulk = balance.dissipation.bulk_rate(spectrum)

    # Our first guess is the wind estimate based on the peak
    # equilibrium range approximation
    guess = estimate_u10_from_spectrum(
        spectrum, "peak", direction_convention="going_to_counter_clockwise_east"
    )["u10"]

    u10_estimate = windspeed_and_direction_from_spectra(
        balance=balance,
        guess_u10=guess,
        spectrum=spectrum,
        time_derivative_spectrum=time_derivative_spectrum,
        direction_iteration=direction_iteration,
    )

    return u10_estimate
def estimate_u10_from_spectrum(spectrum: Union[FrequencySpectrumFrequencyDirectionSpectrum], method: Literal['peak', 'mean'] = 'peak', fmax=0.5, power=4, directional_spreading_constant=2.5, phillips_constant_beta=0.012, vonkarman_constant=0.4, grav=9.81, number_of_bins=20, direction_convention: Literal['coming_from_clockwise_north', 'going_to_counter_clockwise_east'] = 'going_to_counter_clockwise_east', **kwargs) ‑> xarray.core.dataset.Dataset
Expand source code
def estimate_u10_from_spectrum(
    spectrum: Union[FrequencySpectrum, FrequencyDirectionSpectrum],
    method: _methods = "peak",
    fmax=0.5,
    power=4,
    directional_spreading_constant=2.5,
    phillips_constant_beta=0.012,
    vonkarman_constant=0.4,
    grav=9.81,
    number_of_bins=20,
    direction_convention: _direction_convention = "going_to_counter_clockwise_east",
    **kwargs,
) -> xarray.Dataset:
    #
    # =========================================================================
    # Required Input
    # =========================================================================
    #
    # f              :: frequencies (in Hz)
    # E              :: Variance densities (in m^2 / Hz )
    #
    # =========================================================================
    # Output
    # =========================================================================
    #
    # U10            :: in m/s
    # Direction      :: in degrees clockwise from North (where wind is *coming from)
    #
    # =========================================================================
    # Named Keywords (parameters to inversion algorithm)
    # =========================================================================
    # Npower = 4     :: exponent for the fitted f^-N spectral tail
    # I      = 2.5   :: Philips Directional Constant
    # beta   = 0.012 :: Equilibrium Constant
    # Kapppa = 0.4   :: Von Karman constant
    # Alpha  = 0.012 :: Constant in estimating z0 from u* in Charnock relation
    # grav   = 9.81  :: Gravitational acceleration
    #
    # =========================================================================
    # Algorithm
    # =========================================================================
    #
    # 1) Find the part of the spectrum that best fits a f^-4 shape
    # 2) Estimate the Phillips equilibrium level "Emean" over that range
    # 3) Use Emean to estimate Wind speed (using Charnock and LogLaw)
    # 4) Calculate mean direction over equilibrium range

    if isinstance(spectrum, FrequencyDirectionSpectrum):
        spectrum_1d: FrequencySpectrum = spectrum.as_frequency_spectrum()
    else:
        spectrum_1d = spectrum

    # Get friction velocity from spectrum
    dataset = friction_velocity(
        spectrum_1d,
        method,
        fmax,
        power,
        directional_spreading_constant,
        phillips_constant_beta,
        grav,
        number_of_bins,
    )

    # Find z0 from Charnock Relation
    z0 = charnock_roughness_length(dataset.friction_velocity, **kwargs)

    if direction_convention == "coming_from_clockwise_north":
        dataset["direction"] = (270.0 - dataset["direction"]) % 360
    elif direction_convention == "going_to_counter_clockwise_east":
        pass
    else:
        raise ValueError(f"Unknown direectional convention: {direction_convention}")

    # Get the wind speed at U10 from loglaw
    return dataset.assign(
        {"u10": dataset.friction_velocity / vonkarman_constant * np.log(10.0 / z0)}
    )
def friction_velocity(spectrum: FrequencySpectrum, method: Literal['peak', 'mean'] = 'peak', fmax: float = 0.5, power: float = 4, directional_spreading_constant: float = 2.5, beta: float = 0.012, grav: float = 9.81, number_of_bins: int = 20) ‑> xarray.core.dataset.Dataset

:param spectrum: :param method: :param fmax: :param power: :param directional_spreading_constant: :param beta: :param grav: :param number_of_bins: :return:

Expand source code
def friction_velocity(
    spectrum: FrequencySpectrum,
    method: _methods = "peak",
    fmax: float = 0.5,
    power: float = 4,
    directional_spreading_constant: float = 2.5,
    beta: float = 0.012,
    grav: float = 9.81,
    number_of_bins: int = 20,
) -> xarray.Dataset:
    """

    :param spectrum:
    :param method:
    :param fmax:
    :param power:
    :param directional_spreading_constant:
    :param beta:
    :param grav:
    :param number_of_bins:
    :return:
    """
    e, a1, b1 = equilibrium_range_values(
        spectrum,
        method=method,
        fmax=fmax,
        power=power,
        number_of_bins=number_of_bins,
    )

    emean = 8.0 * np.pi**3 * e

    # Get friction velocity from spectrum
    friction_velocity_estimate = (
        emean / grav / directional_spreading_constant / beta / 4
    )

    # Estimate direction from tail
    direction = (180.0 / np.pi * np.arctan2(b1, a1)) % 360
    coords = {x: spectrum.dataset[x].values for x in spectrum.dims_space_time}
    return xarray.Dataset(
        data_vars={
            "friction_velocity": (spectrum.dims_space_time, friction_velocity_estimate),
            "direction": (spectrum.dims_space_time, direction),
        },
        coords=coords,
    )