Module ocean_science_utilities.wavespectra.spectrum

Expand source code
import numpy as np
import xarray

from typing import (
    Dict,
    Iterator,
    Hashable,
    List,
    Literal,
    Mapping,
    Optional,
    Union,
)
from warnings import warn

from ocean_science_utilities.interpolate.dataset import (
    interpolate_dataset_grid,
    interpolate_dataset_along_axis,
)
from ocean_science_utilities.tools.grid import midpoint_rule_step
from ocean_science_utilities.tools.math import wrapped_difference
from ocean_science_utilities.tools.time import to_datetime64
from ocean_science_utilities.wavetheory.lineardispersion import (
    inverse_intrinsic_dispersion_relation,
    intrinsic_group_velocity,
)
from ocean_science_utilities.wavespectra.estimators.estimate import (
    estimate_directional_distribution,
    Estimators,
)
from ocean_science_utilities.wavespectra._tools import (
    numba_fill_zeros_or_nan_in_tail,
    spline_peak_frequency,
    _cdf_interpolate_spline,
)

NAME_F: Literal["frequency"] = "frequency"
NAME_D: Literal["direction"] = "direction"
NAME_T: Literal["time"] = "time"
NAME_E: Literal["variance_density"] = "variance_density"
NAME_a1: Literal["a1"] = "a1"
NAME_b1: Literal["b1"] = "b1"
NAME_a2: Literal["a2"] = "a2"
NAME_b2: Literal["b2"] = "b2"
NAME_LAT: Literal["latitude"] = "latitude"
NAME_LON: Literal["longitude"] = "longitude"
NAME_DEPTH: Literal["depth"] = "depth"
NAMES_2D = (NAME_F, NAME_D, NAME_T, NAME_E, NAME_LAT, NAME_LON, NAME_DEPTH)
NAMES_1D = (
    NAME_F,
    NAME_T,
    NAME_E,
    NAME_LAT,
    NAME_LON,
    NAME_a1,
    NAME_b1,
    NAME_a2,
    NAME_b2,
    NAME_DEPTH,
)
SPECTRAL_VARS = (NAME_E, NAME_a1, NAME_b1, NAME_a2, NAME_b2)
SPECTRAL_MOMENTS = (NAME_a1, NAME_b1, NAME_a2, NAME_b2)
SPECTRAL_DIMS = (NAME_F, NAME_D)
SPACE_TIME_DIMS = (NAME_T, NAME_LON, NAME_LAT)


class DatasetWrapper:
    """
    A class that wraps a dataset object and passes through some of its primary
    functionality (get/set etc.). Used here mostly to make explicit what parts
    of the Dataset interface we actually expose in frequency objects. Note that
    we do not claim- or try to obtain completeness here. If full capabilities
    of the dataset object are needed we can simple operate directly on the
    dataset object itself.
    """

    def __init__(self, dataset: xarray.Dataset):
        self.dataset = dataset

    def __getitem__(self, item) -> xarray.DataArray:
        return self.dataset.__getitem__(item)

    def __setitem__(self, key, value) -> None:
        return self.dataset.__setitem__(key, value)

    def __copy__(self):
        cls = self.__class__
        return cls(self.dataset.copy())

    def __len__(self) -> int:
        return len(self.dataset)

    def copy(self, deep=True):
        if deep:
            return self.__deepcopy__({})
        else:
            return self.__copy__()

    def __deepcopy__(self, memodict):
        cls = self.__class__
        return cls(self.dataset.copy(deep=True))

    def coords(self) -> xarray.core.coordinates.DatasetCoordinates:
        return self.dataset.coords

    def keys(self):
        return self.dataset.keys()

    def __contains__(self, key: object) -> bool:
        return key in self.dataset

    def __iter__(self) -> Iterator[Hashable]:
        return self.dataset.__iter__()

    def sel(self, *args, method="nearest"):
        cls = type(self)
        dataset = xarray.Dataset()
        for var in self.dataset:
            dataset = dataset.assign({var: self.dataset[var].sel(*args, method=method)})
        return cls(dataset=dataset)

    def isel(self, *args, **kwargs):
        cls = type(self)
        dataset = xarray.Dataset()
        for var in self.dataset:
            dataset = dataset.assign({var: self.dataset[var].isel(*args, **kwargs)})
        return cls(dataset=dataset)


class WaveSpectrum(DatasetWrapper):
    frequency_units = "Hertz"
    angular_units = "Degrees"
    spectral_density_units = "m**2/Hertz"
    angular_convention = (
        "Wave travel direction (going-to), measured anti-clockwise from East"
    )
    bulk_properties = (
        "m0",
        "hm0",
        "tm01",
        "tm02",
        "peak_period",
        "peak_direction",
        "peak_directional_spread",
        "mean_direction",
        "mean_directional_spread",
        "peak_frequency",
        "peak_wavenumber",
        "latitude",
        "longitude",
        "time",
    )

    def __init__(self, dataset: xarray.Dataset):
        super(WaveSpectrum, self).__init__(dataset)

    def __add__(self, other):
        cls = type(self)
        spectrum = cls(self.copy(deep=True).dataset)
        spectrum.dataset[NAME_E] = spectrum.dataset[NAME_E] + other.dataset[NAME_E]
        return spectrum

    def __sub__(self, other):
        cls = type(self)
        spectrum = cls(self.copy(deep=True).dataset)
        spectrum.dataset[NAME_E] = spectrum.dataset[NAME_E] - other.dataset[NAME_E]
        return spectrum

    def __neg__(self):
        """
        Negate self- that is -spectrum
        :return: spectrum with all spectral values taken to have the opposite
            sign.
        """
        cls = type(self)
        spectrum = cls(self.copy(deep=True).dataset)
        spectrum.dataset[NAME_E] = -spectrum.dataset[NAME_E]
        return spectrum

    def __len__(self) -> int:
        return self.number_of_spectra

    def __getitem__(self, item) -> xarray.DataArray:
        if isinstance(item, tuple):
            if len(item) < self.ndims:
                raise ValueError(
                    "Indexing requires same number of inputs"
                    f"as dimensions: {self.ndims}"
                )
            space_time_index = item[: -len(self.dims_spectral)]
        else:
            if not self.ndims == 1:
                raise ValueError(
                    "Indexing requires same number of inputs"
                    f"as dimensions: {self.ndims}"
                )
            space_time_index = []

        dataset = xarray.Dataset()
        for var in self.dataset:
            if var in SPECTRAL_VARS:
                dataset = dataset.assign({var: self.dataset[var].__getitem__(item)})
            else:
                if space_time_index:
                    # array
                    dataset = dataset.assign(
                        {var: self.dataset[var].__getitem__(space_time_index)}
                    )
                else:
                    # Scalar
                    dataset = dataset.assign({var: self.dataset[var]})

        for coor in dataset.coords:
            if coor not in dataset.dims:
                dataset = dataset.reset_coords(str(coor))

        cls = type(self)
        return cls(dataset)

    @property
    def ndims(self) -> int:
        return len(self.dims)

    @property
    def frequency_step(self) -> xarray.DataArray:
        prepend = 2 * self.frequency[0] - self.frequency[1]
        append = 2 * self.frequency[-1] - self.frequency[-2]
        diff = np.diff(self.frequency, append=append, prepend=prepend)
        return xarray.DataArray(
            data=(diff[0:-1] * 0.5 + diff[1:] * 0.5),
            dims=NAME_F,
            coords={NAME_F: self.frequency},
        )

    def fillna(self, value=0.0):
        for variable in SPECTRAL_VARS:
            if variable in self.dataset:
                self.dataset[variable] = self.dataset[variable].fillna(value)

    def is_invalid(self) -> xarray.DataArray:
        return self.variance_density.isnull().all(dim=self.dims_spectral)

    def is_valid(self) -> xarray.DataArray:
        return ~self.is_invalid()

    def drop_invalid(self):
        return self._apply_filter(self.is_valid())

    def where(self, condition: xarray.DataArray):
        return self._apply_filter(condition)

    def _apply_filter(self, boolean_mask: xarray.DataArray):
        dataset = xarray.Dataset()
        for var in self.dataset:
            data = self.dataset[var].where(
                boolean_mask.reindex_like(self.dataset[var]), drop=True
            )
            dataset = dataset.assign({var: data})

        cls = type(self)
        return cls(dataset)

    def mean(self, dim, skipna=False):
        """
        Calculate the mean value of the spectrum along the given dimension.
        :param dim: dimension to average over
        :param skipna: whether or not to "skip" nan values; if
            True behaves as np.nanmean
        :return:
        """
        if dim in SPECTRAL_DIMS:
            raise ValueError("Cannot calculate mean over spectral dimensions")

        cls = type(self)
        dataset = xarray.Dataset()
        # Todo: fix averaging over longitude for (prime/anti) meridian issues
        dataset = dataset.assign({dim: self.dataset[dim].mean(dim=dim, skipna=skipna)})
        for x in self.dataset:
            dataset = dataset.assign({x: self.dataset[x].mean(dim=dim, skipna=skipna)})
        return cls(dataset)

    def flatten(self, flattened_coordinate="linear_index"):
        """
        Serialize the non-spectral dimensions creating a single leading dimension
        without a coordinate.
        """

        # Get the current dimensions and shape
        dims = self.dims_space_time
        coords = self.coords_space_time
        shape = self.space_time_shape()
        if len(shape) == 0:
            length = 1
            shape = (1,)
        else:
            length = np.prod(shape)

        # Calculate the flattened shape
        new_shape = (length,)
        new_spectral_shape = (length, *self.spectral_shape())
        new_dims = [flattened_coordinate] + self.dims_spectral

        linear_index = xarray.DataArray(
            data=np.arange(0, length), dims=flattened_coordinate
        )
        indices = np.unravel_index(linear_index.values, shape)

        dataset = {}
        for index, dim in zip(indices, dims):
            dataset[dim] = xarray.DataArray(
                data=coords[dim].values[index], dims=flattened_coordinate
            )

        for name in self.dataset:
            if name in SPECTRAL_VARS:
                x = xarray.DataArray(
                    data=self.dataset[name].values.reshape(new_spectral_shape),
                    dims=new_dims,
                    coords=self.coords_spectral,
                )
            else:
                x = xarray.DataArray(
                    data=self.dataset[name].values.reshape(new_shape),
                    dims=flattened_coordinate,
                )
            dataset[str(name)] = x

        cls = type(self)
        return cls(xarray.Dataset(dataset))

    def sum(self, dim: str, skipna: bool = False):
        """
        Calculate the sum value of the spectrum along the given dimension.
        :param dim: dimension to sum over
        :param skipna: whether or not to "skip" nan values; if True behaves as np.nansum
        :return:
        """

        if dim in SPECTRAL_DIMS:
            raise ValueError("Cannot calculate sum over spectral dimensions")

        cls = type(self)
        dataset = xarray.Dataset()
        # we assign the average coordinate to the dimension we sum over
        dataset = dataset.assign({dim: self.dataset[dim].mean(dim=dim, skipna=skipna)})
        for x in self.dataset:
            dataset = dataset.assign({x: self.dataset[x].sum(dim=dim, skipna=skipna)})
        return cls(dataset)

    def std(self, dim: str, skipna: bool = False):
        """
        Calculate the standard deviation of the spectrum along the given dimension.
        :param dim: dimension to calculate standard deviation over
        :param skipna: whether or not to "skip" nan values; if True behaves as np.nanstd
        :return:
        """
        if dim in SPECTRAL_DIMS:
            raise ValueError(
                "Cannot calculate standard deviation over spectral dimensions"
            )

        cls = type(self)
        dataset = xarray.Dataset()
        # we assign the average coordinate to the dimension we calculate the std over
        dataset = dataset.assign({dim: self.dataset[dim].mean(dim=dim, skipna=skipna)})
        for x in self.dataset:
            dataset = dataset.assign({x: self.dataset[x].std(dim=dim, skipna=skipna)})
        return cls(dataset)

    def shape(self):
        return self.variance_density.shape

    def spectral_shape(self):
        number_of_spectral_dims = len(self.dims_spectral)
        return self.shape()[-number_of_spectral_dims:]

    def space_time_shape(self):
        number_of_spectral_dims = len(self.dims_spectral)
        return self.shape()[:-number_of_spectral_dims]

    def frequency_moment(self, power: int, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        Calculate a "frequency moment" over the given range. A frequency moment
        here refers to the integral:

                    Integral-over-frequency-range[ e(f) * f**power ]

        :param power: power of the frequency
        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: frequency moment
        """
        _range = self._range(fmin, fmax)

        # Integrate dataset over frequencies. Make sure to fill any NaN entries
        # with 0 before the integration.
        return (
            (self.e.isel({NAME_F: _range}) * self.frequency[_range] ** power)
            .fillna(0)
            .integrate(coord=NAME_F)
        )

    @property
    def number_of_frequencies(self) -> int:
        """
        :return: number of frequencies
        """
        return len(self.frequency)

    @property
    def dims_space_time(self) -> List[str]:
        return [str(x) for x in self.variance_density.dims if x not in (SPECTRAL_DIMS)]

    @property
    def coords_space_time(self) -> Mapping[str, xarray.DataArray]:
        return {dim: self.dataset[dim] for dim in self.dims_space_time}

    @property
    def coords_spectral(self) -> Mapping[str, xarray.DataArray]:
        return {dim: self.dataset[dim] for dim in self.dims_spectral}

    @property
    def dims_spectral(self) -> List[str]:
        return [str(x) for x in self.variance_density.dims if x in (SPECTRAL_DIMS)]

    @property
    def dims(self) -> List[str]:
        return [str(x) for x in self.variance_density.dims]

    @property
    def number_of_spectra(self):
        dims = self.dims_space_time
        if dims:
            shape = 1
            for d in dims:
                shape *= len(self.dataset[d])
            return shape
        else:
            return 1

    @property
    def spectral_values(self) -> xarray.DataArray:
        """
        :return: Spectral levels
        """
        return self.dataset[NAME_E]

    @property
    def radian_frequency(self) -> xarray.DataArray:
        """
        :return: Radian frequency
        """
        data_array = self.dataset[NAME_F] * 2 * np.pi
        data_array.name = "radian_frequency"
        return data_array

    @property
    def latitude(self) -> xarray.DataArray:
        """
        :return: latitudes
        """
        return self.dataset[NAME_LAT]

    @property
    def longitude(self) -> xarray.DataArray:
        """
        :return: longitudes
        """
        return self.dataset[NAME_LON]

    @property
    def time(self) -> xarray.DataArray:
        """
        :return: Time
        """
        return self.dataset[NAME_T]

    @property
    def variance_density(self) -> xarray.DataArray:
        """
        :return: Time
        """
        return self.dataset[NAME_E]

    @property
    def values(self) -> np.ndarray:
        """
        Get the raw np representation of the wave spectrum
        :return: Numpy ndarray of the wave spectrum.
        """
        return self.dataset[NAME_E].values

    @property
    def e(self) -> xarray.DataArray:
        """
        :return: 1D spectral values (directionally integrated spectrum).
            Equivalent to self.spectral_values if this is a 1D spectrum.
        """
        return self.dataset[NAME_E]

    @property
    def a1(self) -> xarray.DataArray:
        """
        :return: normalized Fourier moment cos(theta)
        """
        return self.dataset[NAME_a1]

    @property
    def b1(self) -> xarray.DataArray:
        """
        :return: normalized Fourier moment sin(theta)
        """
        return self.dataset[NAME_b1]

    @property
    def a2(self) -> xarray.DataArray:
        """
        :return: normalized Fourier moment cos(2*theta)
        """
        return self.dataset[NAME_a2]

    @property
    def b2(self) -> xarray.DataArray:
        """
        :return: normalized Fourier moment sin(2*theta)
        """
        return self.dataset[NAME_b2]

    @property
    def A1(self) -> xarray.DataArray:
        """
        :return: Fourier moment cos(theta)
        """
        return self.a1 * self.e

    @property
    def B1(self) -> xarray.DataArray:
        """
        :return: Fourier moment sin(theta)
        """
        return self.b1 * self.e

    @property
    def A2(self) -> xarray.DataArray:
        """
        :return: Fourier moment cos(2*theta)
        """
        return self.a2 * self.e

    @property
    def B2(self) -> xarray.DataArray:
        """
        :return: Fourier moment sin(2*theta)
        """
        return self.b2 * self.e

    @property
    def frequency(self) -> xarray.DataArray:
        """
        :return: Frequencies (Hz)
        """
        return self.dataset[NAME_F]

    def m0(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        Zero order frequency moment. Also referred to as variance or energy.

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: variance/energy
        """
        return self.frequency_moment(0, fmin, fmax)

    def m1(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        First order frequency moment. Primarily used in calculating a mean
        period measure (Tm01)

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: first order frequency moment.
        """
        return self.frequency_moment(1, fmin, fmax)

    def wave_speed(self) -> xarray.DataArray:
        """
        :return:
        """
        # Note we multiply inverse wavenumber with frequency to force xarray to return
        # a number_of_points by by number of frequencies data structure.
        return (1 / self.wavenumber) * self.radian_frequency

    def wave_age(self, windspeed):
        return self.peak_wave_speed() / windspeed

    def peak_wave_speed(self) -> xarray.DataArray:
        return 2 * np.pi * self.peak_frequency() / self.peak_wavenumber

    @property
    def wavenumber_density(self) -> xarray.DataArray:
        return self.variance_density * self.group_velocity / (np.pi * 2)

    @property
    def saturation_spectrum(self) -> xarray.DataArray:
        return self.wavenumber_density * self.wavenumber**3

    @property
    def slope_spectrum(self) -> xarray.DataArray:
        return self.variance_density * self.wavenumber**2

    def mean_squared_slope(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        _range = self._range(fmin, fmax)

        # Integrate dataset over frequencies. Make sure to fill any NaN entries with
        # 0 before the integration.
        return (
            self.slope_spectrum.fillna(0).isel({NAME_F: _range}).integrate(coord=NAME_F)
        )

    def m2(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        Second order frequency moment. Primarily used in calculating the zero
        crossing period (Tm02)

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: Second order frequency moment.
        """
        return self.frequency_moment(2, fmin, fmax)

    def hm0(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        Significant wave height estimated from the spectrum, i.e. waveheight
        h estimated from variance m0. Common notation in literature.

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: Significant wave height
        """
        return xarray.DataArray(4 * np.sqrt(self.m0(fmin, fmax)))

    def tm01(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        Mean period, estimated as the inverse of the center of mass of the
        spectral curve under the 1d spectrum.

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: Mean period
        """
        return self.m0(fmin, fmax) / self.m1(fmin, fmax)

    def tm02(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        Zero crossing period based on Rice's spectral estimate.

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: Zero crossing period
        """
        return xarray.DataArray(np.sqrt(self.m0(fmin, fmax) / self.m2(fmin, fmax)))

    def peak_index(self, fmin: float = 0, fmax: float = np.inf) -> xarray.DataArray:
        """
        Index of the peak frequency of the 1d spectrum within the given range
        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: peak indices
        """
        return self.e.where(self._range(fmin, fmax), 0).argmax(dim=NAME_F)

    def peak_frequency(
        self, fmin=0.0, fmax=np.inf, use_spline=False, **kwargs
    ) -> xarray.DataArray:
        """
        Peak frequency of the spectrum, i.e. frequency at which the spectrum
        obtains its maximum.

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :param use_spline: Use a spline based interpolation and determine peak
            frequency from the spline. This
        allows for a continuous estimate of the peak frequency. WARNING: if True the
        fmin and fmax paramteres are IGNORED
        :return: peak frequency
        """
        if use_spline:
            if not fmin == 0.0 or np.isfinite(fmax):
                warn(
                    "The fmin and fmax parameters are ignored"
                    "if use_spline is set to True"
                )

            data = spline_peak_frequency(self.frequency.values, self.e.values, **kwargs)
            if len(self.dims_space_time) == 0:
                data = data[0]

            return xarray.DataArray(
                data=data,
                coords=self.coords_space_time,
                dims=self.dims_space_time,
            )
        else:
            return self.dataset[NAME_F][self.peak_index(fmin, fmax)]

    def peak_angular_frequency(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        Peak frequency of the spectrum, i.e. frequency at which the spectrum
        obtains its maximum.

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: peak frequency
        """
        return self.peak_frequency(fmin, fmax) * np.pi * 2

    def peak_period(
        self, fmin=0, fmax=np.inf, use_spline=False, **kwargs
    ) -> xarray.DataArray:
        """
        Peak period of the spectrum, i.e. period at which the spectrum
        obtains its maximum.

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: peak period
        """
        peak_period = 1 / self.peak_frequency(
            fmin, fmax, use_spline=use_spline, **kwargs
        )
        peak_period.name = "peak period"
        try:
            peak_period = peak_period.drop("frequency")
        except Exception:
            pass
        return peak_period

    def peak_direction(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        index = self.peak_index(fmin, fmax)
        return self._mean_direction(
            self.a1.isel(**{NAME_F: index}), self.b1.isel(**{NAME_F: index})
        )

    def peak_directional_spread(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        index = self.peak_index(fmin, fmax)
        a1 = self.a1.isel(**{NAME_F: index})
        b1 = self.b1.isel(**{NAME_F: index})
        return self._spread(a1, b1)

    @staticmethod
    def _mean_direction(a1: xarray.DataArray, b1: xarray.DataArray) -> xarray.DataArray:
        return xarray.DataArray(np.arctan2(b1, a1) * 180 / np.pi)

    @staticmethod
    def _spread(a1: xarray.DataArray, b1: xarray.DataArray) -> xarray.DataArray:
        return xarray.DataArray(
            np.sqrt(2 - 2 * np.sqrt(a1**2 + b1**2)) * 180 / np.pi
        )

    @property
    def mean_direction_per_frequency(self) -> xarray.DataArray:
        return self._mean_direction(self.a1, self.b1)

    @property
    def mean_spread_per_frequency(self) -> xarray.DataArray:
        return self._spread(self.a1, self.b1)

    def _spectral_weighted(self, property: xarray.DataArray, fmin=0, fmax=np.inf):
        range = {NAME_F: self._range(fmin, fmax)}

        property = property.fillna(0)
        return np.trapz(
            property.isel(**range) * self.e.isel(**range), self.frequency[range]
        ) / self.m0(fmin, fmax)

    def mean_direction(self, fmin=0, fmax=np.inf):
        return self._mean_direction(self.mean_a1(fmin, fmax), self.mean_b1(fmin, fmax))

    def mean_directional_spread(self, fmin=0, fmax=np.inf):
        return self._spread(self.mean_a1(fmin, fmax), self.mean_b1(fmin, fmax))

    def mean_a1(self, fmin=0, fmax=np.inf):
        return self._spectral_weighted(self.a1, fmin, fmax)

    def mean_b1(self, fmin=0, fmax=np.inf):
        return self._spectral_weighted(self.b1, fmin, fmax)

    def mean_a2(self, fmin=0, fmax=np.inf):
        return self._spectral_weighted(self.a2, fmin, fmax)

    def mean_b2(self, fmin=0, fmax=np.inf):
        return self._spectral_weighted(self.b2, fmin, fmax)

    @property
    def depth(self) -> xarray.DataArray:
        depth = self.dataset[NAME_DEPTH]
        return xarray.where(depth.isnull(), np.inf, depth)

    @property
    def group_velocity(self) -> xarray.DataArray:
        depth = self.depth.expand_dims(dim=NAME_F, axis=-1).values
        k = self.wavenumber.values
        depth = depth * np.ones(k.shape)

        # Construct the output coordinates and dimension of the data array
        return_dimensions = (*self.dims_space_time, NAME_F)
        coords = {}
        for dim in return_dimensions:
            coords[dim] = self.dataset[dim].values

        return xarray.DataArray(
            data=intrinsic_group_velocity(k, depth),
            dims=return_dimensions,
            coords=coords,
        )

    @property
    def wavenumber(self) -> xarray.DataArray:
        """
        Determine the wavenumbers for the frequencies in the spectrum. Note that since
        the dispersion relation depends on depth the returned wavenumber array has the
        dimensions associated with the depth array by the frequency dimension.

        :return: wavenumbers
        """

        # For numba (used in the dispersion relation) we need raw np arrays of
        # the correct dimension
        depth = self.depth.expand_dims(dim=NAME_F, axis=-1).values
        radian_frequency = self.radian_frequency.expand_dims(dim=self.depth.dims).values

        # Broadcasting does not work inside the numba implementaiton, we explicitly
        # need to construct arrays of the correct input dimension.
        depth_shape = depth.shape
        radian_frequency_shape = radian_frequency.shape

        depth = depth * np.ones(radian_frequency_shape)
        radian_frequency = np.ones(depth_shape) * radian_frequency

        # Construct the output coordinates and dimension of the data array
        return_dimensions = (*self.dims_space_time, NAME_F)
        coords = {}
        for dim in return_dimensions:
            coords[dim] = self.dataset[dim].values

        return xarray.DataArray(
            data=inverse_intrinsic_dispersion_relation(radian_frequency, depth),
            dims=return_dimensions,
            coords=coords,
        )

    @property
    def wavelength(self) -> xarray.DataArray:
        return 2 * np.pi / self.wavenumber

    @property
    def peak_wavenumber(self) -> xarray.DataArray:
        index = self.peak_index()
        # Construct the output coordinates and dimension of the data array
        coords = {}
        for dim in self.dims_space_time:
            coords[dim] = self.dataset[dim].values

        return xarray.DataArray(
            data=inverse_intrinsic_dispersion_relation(
                self.radian_frequency[index].values, self.depth.values
            ),
            dims=self.dims_space_time,
            coords=coords,
        )

    def bulk_variables(self) -> xarray.Dataset:
        dataset = xarray.Dataset()
        dataset["significant_waveheight"] = self.significant_waveheight
        dataset["mean_period"] = self.mean_period
        dataset["peak_period"] = self.peak_period()
        dataset["peak_direction"] = self.peak_direction()
        dataset["peak_directional_spread"] = self.peak_directional_spread()
        dataset["mean_direction"] = self.mean_direction()
        dataset["mean_directional_spread"] = self.mean_directional_spread()
        dataset["peak_frequency"] = self.peak_frequency()
        dataset["latitude"] = self.latitude
        dataset["longitude"] = self.longitude
        dataset["timestamp"] = self.time
        return dataset

    @property
    def significant_waveheight(self) -> xarray.DataArray:
        return self.hm0()

    @property
    def mean_period(self) -> xarray.DataArray:
        return self.tm01()

    @property
    def zero_crossing_period(self) -> xarray.DataArray:
        return self.tm02()

    def cdf(self) -> xarray.DataArray:
        """

        :return:
        """
        frequency_step = self.frequency_step
        integration_frequencies = np.concatenate(
            ([0], np.cumsum(frequency_step.values))
        )
        integration_frequencies = (
            integration_frequencies
            - frequency_step.values[0] / 2
            + self.frequency.values[0]
        )
        values = (self.variance_density * frequency_step).values

        frequency_axis = self.dims.index(NAME_F)

        cumsum = np.cumsum(values, axis=frequency_axis)
        # cumsum =  self.variance_density.cumulative_integrate(coord=NAME_F)
        # return cumsum
        shape = list(cumsum.shape)
        shape[frequency_axis] = 1

        cumsum = np.concatenate((np.zeros(shape), cumsum), axis=frequency_axis)

        coords = {str(coor): self.coords()[coor].values for coor in self.coords()}
        coords[NAME_F] = integration_frequencies
        return xarray.DataArray(data=cumsum, dims=self.dims, coords=coords)

    def interpolate(
        self,
        coordinates: Dict[str, Union[xarray.DataArray, np.ndarray]],
        extrapolation_value: float = 0.0,
    ):
        dataset = self.__class__(interpolate_dataset_grid(coordinates, self.dataset))
        dataset.fillna(extrapolation_value)
        return dataset

    def extrapolate_tail(
        self,
        end_frequency,
        power=None,
        tail_energy=None,
        tail_bounds=None,
        tail_moments=None,
        tail_frequency=None,
    ) -> "FrequencySpectrum":
        """
        Extrapolate the tail using the given power
        :param end_frequency: frequency to extrapolate to
        :param power: power to use. If None, a best fit -4 or -5 tail is used.
        :return:
        """
        e = self.e
        a1 = self.a1
        b1 = self.b1
        a2 = self.a2
        b2 = self.b2

        frequency = self.frequency.values
        frequency_delta = frequency[-1] - frequency[-2]
        n = int((end_frequency - frequency[-1]) / frequency_delta) + 1

        fstart = frequency[-1] + frequency_delta
        fend = frequency[-1] + n * frequency_delta

        if tail_frequency is None:
            tail_frequency = np.linspace(fstart, fend, n, endpoint=True)

        tail_frequency = xarray.DataArray(
            data=tail_frequency, coords={"frequency": tail_frequency}, dims="frequency"
        )
        variance_density = xarray.concat(
            (e, e.isel(frequency=-1) * xarray.zeros_like(tail_frequency)),
            dim="frequency",
        )

        tail_a1 = a1.isel(frequency=-1) if tail_moments is None else tail_moments["a1"]
        tail_b1 = b1.isel(frequency=-1) if tail_moments is None else tail_moments["b1"]
        tail_a2 = a2.isel(frequency=-1) if tail_moments is None else tail_moments["a2"]
        tail_b2 = b2.isel(frequency=-1) if tail_moments is None else tail_moments["b2"]

        a1 = xarray.concat(
            (a1, tail_a1 * xarray.ones_like(tail_frequency)), dim="frequency"
        )
        b1 = xarray.concat(
            (b1, tail_b1 * xarray.ones_like(tail_frequency)), dim="frequency"
        )
        a2 = xarray.concat(
            (a2, tail_a2 * xarray.ones_like(tail_frequency)), dim="frequency"
        )
        b2 = xarray.concat(
            (b2, tail_b2 * xarray.ones_like(tail_frequency)), dim="frequency"
        )

        if tail_energy is not None:
            if isinstance(tail_energy, xarray.DataArray):
                tail_energy = tail_energy.values

            tail_information = (tail_bounds, tail_energy)
        else:
            tail_information = None

        variance_density = xarray.DataArray(
            data=numba_fill_zeros_or_nan_in_tail(
                variance_density.values,
                variance_density.frequency.values,
                power,
                tail_information=tail_information,
            ),
            dims=a1.dims,
            coords=a1.coords,
        )

        dataset = xarray.Dataset(
            {
                "variance_density": variance_density,
                "a1": a1,
                "b1": b1,
                "a2": a2,
                "b2": b2,
            }
        )

        for name in self.dataset:
            if name in SPECTRAL_VARS:
                continue
            else:
                dataset = dataset.assign({name: self.dataset[name]})

        return FrequencySpectrum(dataset)

    def bandpass(self, fmin: float = 0, fmax: float = np.inf):
        dataset = xarray.Dataset()

        for name in self.dataset:
            if name in SPECTRAL_VARS:
                data = self.dataset[name].where(
                    (self.frequency >= fmin) & (self.frequency < fmax), drop=True
                )
                dataset = dataset.assign({name: data})
            else:
                dataset = dataset.assign({name: self.dataset[name]})
        cls = type(self)
        return cls(dataset)

    def interpolate_frequency(
        self,
        new_frequencies: Union[xarray.DataArray, np.ndarray],
        extrapolation_value: float = 0.0,
    ):
        obj = self.__class__(
            interpolate_dataset_along_axis(
                new_frequencies, self.dataset, coordinate_name="frequency"
            )
        )
        obj.fillna(extrapolation_value)
        return obj

    def _range(self, fmin=0.0, fmax=np.inf) -> np.ndarray:
        return (self.dataset[NAME_F].values >= fmin) & (
            self.dataset[NAME_F].values < fmax
        )

    def save_as_netcdf(self, path):
        self.dataset.to_netcdf(path)

    def multiply(
        self,
        array: np.ndarray,
        dimensions: Optional[List[str]] = None,
        inplace: bool = False,
    ):
        """
        Multiply the variance density with the given np array. Broadcasting is
        performed automatically if dimensions are provided. If no dimensions are
        provided the array needs to have the exact same shape as the variance
        density array.

        :param array: Array to multiply with variance density
        :param dimension: Dimensions of the array
        :return: self
        """
        if inplace:
            output = self
        else:
            output = self.copy()

        coords = {}
        shape = array.shape
        if dimensions is None:
            if shape != self.shape():
                raise ValueError(
                    "If no dimensions are provided the array must have the exact same"
                    "shape as the variance density array."
                )

            output.dataset[NAME_E] = self.dataset[NAME_E] * array
            return output

        if len(shape) != len(dimensions):
            raise ValueError(
                "The dimensions of the input array must match the number of"
                "dimension labels"
            )

        for length, dimension in zip(shape, dimensions):
            if dimension not in self.dims:
                raise ValueError(
                    f"Dimension {dimension} not a valid dimension of the"
                    "spectral object."
                )
            coords[dimension] = self.dataset[dimension].values

            if len(self.dataset[dimension].values) != length:
                raise ValueError(
                    f"Array length along the dimension {dimension} does not match the"
                    " length of the coordinate of the same name in the spctral object."
                )

        data = xarray.DataArray(data=array, coords=coords, dims=dimensions)
        output.dataset[NAME_E] = self.dataset[NAME_E] * data
        return output


class FrequencyDirectionSpectrum(WaveSpectrum):
    def __init__(self, dataset: xarray.Dataset):
        super(FrequencyDirectionSpectrum, self).__init__(dataset)
        for name in NAMES_2D:
            if name not in dataset and name not in dataset.coords:
                raise ValueError(
                    f"Required variable/coordinate {name} is"
                    f" not specified in the dataset"
                )

    def __len__(self):
        return int(np.prod(self.spectral_values.shape[:-2]))

    @property
    def direction_step(self) -> xarray.DataArray:
        difference = wrapped_difference(
            np.diff(self.direction.values, append=self.direction[0]), period=360
        )
        return xarray.DataArray(
            data=difference, coords={NAME_D: self.direction.values}, dims=[NAME_D]
        )

    @property
    def radian_direction(self) -> xarray.DataArray:
        data_array = self.dataset[NAME_D] * np.pi / 180
        data_array.name = "radian_direction"
        return data_array

    def _directionally_integrate(
        self, data_array: xarray.DataArray
    ) -> xarray.DataArray:
        return (data_array * self.direction_step).sum(NAME_D, skipna=True)

    @property
    def e(self) -> xarray.DataArray:
        return self._directionally_integrate(self.dataset[NAME_E])

    @property
    def a1(self) -> xarray.DataArray:
        return (
            self._directionally_integrate(
                self.dataset[NAME_E] * np.cos(self.radian_direction)
            )
            / self.e
        )

    @property
    def b1(self) -> xarray.DataArray:
        return (
            self._directionally_integrate(
                self.dataset[NAME_E] * np.sin(self.radian_direction)
            )
            / self.e
        )

    @property
    def a2(self) -> xarray.DataArray:
        return (
            self._directionally_integrate(
                self.dataset[NAME_E] * np.cos(2 * self.radian_direction)
            )
            / self.e
        )

    @property
    def b2(self) -> xarray.DataArray:
        return (
            self._directionally_integrate(
                self.dataset[NAME_E] * np.sin(2 * self.radian_direction)
            )
            / self.e
        )

    @property
    def direction(self) -> xarray.DataArray:
        return self.dataset[NAME_D]

    def as_frequency_spectrum(self):
        dataset = {
            "a1": self.a1,
            "b1": self.b1,
            "a2": self.a2,
            "b2": self.b2,
            "variance_density": self.e,
        }
        for name in self.dataset:
            if name not in SPECTRAL_VARS:
                dataset[str(name)] = self.dataset[name]

        return FrequencySpectrum(xarray.Dataset(dataset))

    def spectrum_1d(self):
        """
        Will be depricated
        :return:
        """
        warn(
            'spectrum_1d method will be removed, use "as_frequency_spectrum" instead',
            DeprecationWarning,
            stacklevel=2,
        )
        return self.as_frequency_spectrum()

    def differentiate(self, coordinate=None, **kwargs) -> "FrequencyDirectionSpectrum":
        if coordinate is None:
            coordinate = "time"

        if coordinate not in self.dataset:
            raise ValueError(f"Coordinate {coordinate} does not exist in the dataset")

        data = {
            NAME_E: (
                self.dims,
                self.variance_density.differentiate(
                    coordinate, datetime_unit="s", **kwargs
                ).values,
            )
        }
        for x in self.dataset:
            if x in SPECTRAL_VARS:
                continue
            data[x] = (self.dims_space_time, self.dataset[x].values)

        return FrequencyDirectionSpectrum(
            xarray.Dataset(data_vars=data, coords=self.coords())
        )

    @property
    def number_of_directions(self) -> int:
        return len(self.direction)


class FrequencySpectrum(WaveSpectrum):
    def __init__(self, dataset: xarray.Dataset):
        super(FrequencySpectrum, self).__init__(dataset)
        for name in NAMES_1D:
            if name not in dataset and name not in dataset.coords:
                raise ValueError(
                    f"Required variable/coordinate {name} is"
                    f" not specified in the dataset"
                )

    def __len__(self):
        return int(np.prod(self.spectral_values.shape[:-1]))

    def interpolate_frequency(
        self: "FrequencySpectrum",
        new_frequencies: Union[xarray.DataArray, np.ndarray],
        extrapolation_value=0.0,
        method: Literal["nearest", "linear", "spline"] = "linear",
        **kwargs,
    ) -> "FrequencySpectrum":
        if isinstance(new_frequencies, xarray.DataArray):
            new_frequencies = new_frequencies.values

        if method == "spline":
            self.fillna(0.0)
            frequency_axis = self.dims.index(NAME_F)
            interpolated_data = cumulative_frequency_interpolation_1d_variable(
                new_frequencies, self.dataset, frequency_axis=frequency_axis, **kwargs
            )
            object = FrequencySpectrum(interpolated_data)
            object.fillna(extrapolation_value)
            return object
        elif method == "linear":
            return self.interpolate(
                {NAME_F: new_frequencies},
                extrapolation_value=extrapolation_value,
                nearest_neighbour=False,
            )
        elif method == "nearest":
            return self.interpolate(
                {NAME_F: new_frequencies},
                extrapolation_value=extrapolation_value,
                nearest_neighbour=True,
            )
        else:
            raise ValueError(f"Unknown interpolation method: {method}")

    def interpolate(
        self: "FrequencySpectrum",
        coordinates,
        extrapolation_value=0.0,
        nearest_neighbour=False,
    ) -> "FrequencySpectrum":
        """

        :param coordinates:
        :return:
        """
        _dataset = xarray.Dataset()
        _moments = [NAME_a1, NAME_b1, NAME_a2, NAME_b2]

        # For physical reasons it is better to interpolate the scaled moments -
        # as opposed to the normalized moments. For the dataset we interpolate
        # we set a1: to A1 etc. Afterwards we scale the output back to the normalized
        # state.
        for name in self.dataset:
            _name = str(name)
            if _name in _moments:
                _dataset = _dataset.assign({_name: getattr(self, _name) * self.e})
            else:
                _dataset = _dataset.assign({_name: self.dataset[_name]})

        interpolated_data = interpolate_dataset_grid(
            coordinates, _dataset, nearest_neighbour
        )
        for name in _moments:
            interpolated_data[name] = (
                interpolated_data[name] / interpolated_data[NAME_E]
            )

        object = FrequencySpectrum(interpolated_data)
        object.fillna(extrapolation_value)
        return object

    def down_sample(self, frequencies):
        cdf = self.cdf()

        frequency_step = midpoint_rule_step(frequencies)
        sampling_frequencies = np.concatenate(([0], np.cumsum(frequency_step)))
        sampling_frequencies = (
            sampling_frequencies - frequency_step[0] / 2 + frequencies[0]
        )

        dims = self.dims
        sampled_cdf = cdf.sel({"frequency": sampling_frequencies}, method="nearest")
        data = {
            NAME_E: (dims, sampled_cdf.diff(dim="frequency").values / frequency_step),
            NAME_a1: (
                dims,
                self.a1.sel({"frequency": frequencies}, method="nearest").values,
            ),
            NAME_b1: (
                dims,
                self.b1.sel({"frequency": frequencies}, method="nearest").values,
            ),
            NAME_a2: (
                dims,
                self.a2.sel({"frequency": frequencies}, method="nearest").values,
            ),
            NAME_b2: (
                dims,
                self.b2.sel({"frequency": frequencies}, method="nearest").values,
            ),
        }

        coords = {x: self.dataset[x].values for x in self.dims}
        coords[NAME_F] = frequencies

        for x in self.dataset:
            if x in SPECTRAL_VARS:
                continue
            data[x] = (self.dims_space_time, self.dataset[x].values)

        return FrequencySpectrum(xarray.Dataset(data_vars=data, coords=coords))

    def as_frequency_direction_spectrum(
        self,
        number_of_directions,
        method: Estimators = "mem2",
        solution_method="scipy",
    ) -> "FrequencyDirectionSpectrum":
        direction = np.linspace(0, 360, number_of_directions, endpoint=False)

        output_array = (
            estimate_directional_distribution(
                self.a1.values,
                self.b1.values,
                self.a2.values,
                self.b2.values,
                direction,
                method=method,
                solution_method=solution_method,
            )
            * self.e.values[..., None]
        )

        dims = self.dims_space_time + [NAME_F, NAME_D]
        coords = {x: self.dataset[x].values for x in self.dims}
        coords[NAME_D] = direction

        data = {NAME_E: (dims, output_array)}
        for x in self.dataset:
            if x in SPECTRAL_VARS:
                continue
            data[x] = (self.dims_space_time, self.dataset[x].values)

        return FrequencyDirectionSpectrum(xarray.Dataset(data_vars=data, coords=coords))


def create_1d_spectrum(
    frequency: np.ndarray,
    variance_density: np.ndarray,
    time: Union[np.ndarray, float],
    latitude: Union[np.ndarray, float],
    longitude: Union[np.ndarray, float],
    a1: Optional[np.ndarray] = None,
    b1: Optional[np.ndarray] = None,
    a2: Optional[np.ndarray] = None,
    b2: Optional[np.ndarray] = None,
    depth: Union[np.ndarray, float] = np.inf,
    dims=(NAME_T, NAME_F),
) -> FrequencySpectrum:
    if a1 is None:
        a1 = np.nan + np.ones_like(variance_density)
    if b1 is None:
        b1 = np.nan + np.ones_like(variance_density)
    if a2 is None:
        a2 = np.nan + np.ones_like(variance_density)
    if b2 is None:
        b2 = np.nan + np.ones_like(variance_density)

    variables = {
        NAME_T: np.atleast_1d(to_datetime64(time)),
        NAME_LAT: np.atleast_1d(latitude),
        NAME_LON: np.atleast_1d(longitude),
        NAME_DEPTH: np.atleast_1d(depth),
        NAME_F: frequency,
        NAME_E: variance_density,
        NAME_a1: a1,
        NAME_b1: b1,
        NAME_a2: a2,
        NAME_b2: b2,
    }

    return FrequencySpectrum(create_spectrum_dataset(dims, variables))


def create_2d_spectrum(
    frequency: np.ndarray,
    direction: np.ndarray,
    variance_density: np.ndarray,
    time,
    latitude: Union[np.ndarray, float, None],
    longitude: Union[np.ndarray, float, None],
    dims=(NAME_T, NAME_F, NAME_D),
    depth: Union[np.ndarray, float] = np.inf,
) -> FrequencyDirectionSpectrum:
    """
    :param frequency:
    :param direction:
    :param variance_density:
    :param time:
    :param latitude:
    :param longitude:
    :param dims:
    :param depth:
    :return:
    """

    variables = {
        NAME_T: np.atleast_1d(to_datetime64(time)),
        NAME_LAT: np.atleast_1d(latitude),
        NAME_LON: np.atleast_1d(longitude),
        NAME_DEPTH: np.atleast_1d(depth),
        NAME_F: frequency,
        NAME_D: direction,
        NAME_E: variance_density,
    }
    return FrequencyDirectionSpectrum(create_spectrum_dataset(dims, variables))


def create_spectrum_dataset(dims, variables) -> xarray.Dataset:
    independent_variables = []
    for dim in dims:
        if dim in variables:
            independent_variables.append(dim)

    dependent_variables = [x for x in variables if x not in independent_variables]

    spectral_coords = {k: variables[k] for k in independent_variables}
    spatial_coords = {
        k: variables[k] for k in independent_variables if k not in SPECTRAL_DIMS
    }

    dataset = xarray.Dataset()
    for variable in dependent_variables:
        if variable in SPECTRAL_VARS:
            coords = spectral_coords
        else:
            coords = spatial_coords
        dims = [k for k in coords]

        if dims:
            dataset = dataset.assign(
                {
                    variable: xarray.DataArray(
                        data=variables[variable], dims=dims, coords=coords
                    )
                }
            )

        else:
            if len(variables[variable]) == 1:
                # If no coordinate is known, and the variable has length 1, we add it
                # as a scalar.
                data = variables[variable][0]
            else:
                # otherwise we add without coordinate/dimension.
                data = xarray.DataArray(data=variables[variable])

            dataset = dataset.assign({variable: data})
    return dataset


def load_spectrum_from_netcdf(
    filename_or_obj,
) -> Union[FrequencySpectrum, FrequencyDirectionSpectrum]:
    """
    Load a spectrum from netcdf file
    :param filename_or_obj:
    :return:
    """
    dataset = xarray.open_dataset(filename_or_obj=filename_or_obj)
    if NAME_D in dataset.coords:
        return FrequencyDirectionSpectrum(dataset=dataset)
    else:
        return FrequencySpectrum(dataset=dataset)


def fill_zeros_or_nan_in_tail(
    spectrum: WaveSpectrum,
    power=None,
    tail_energy=None,
    tail_bounds=None,
) -> FrequencySpectrum:
    variance_density = spectrum.e
    a1 = spectrum.a1
    b1 = spectrum.b1
    a2 = spectrum.a2
    b2 = spectrum.b2

    if tail_energy is not None:
        if isinstance(tail_energy, xarray.DataArray):
            tail_energy = tail_energy.values

        tail_information = (tail_bounds, tail_energy)
    else:
        tail_information = None

    variance_density = xarray.DataArray(
        data=numba_fill_zeros_or_nan_in_tail(
            variance_density.values,
            variance_density.frequency.values,
            power,
            tail_information=tail_information,
        ),
        dims=a1.dims,
        coords=a1.coords,
    )

    dataset = xarray.Dataset(
        {
            "variance_density": variance_density,
            "a1": a1,
            "b1": b1,
            "a2": a2,
            "b2": b2,
        }
    )

    for name in spectrum.dataset:
        if name in SPECTRAL_VARS:
            continue
        else:
            dataset = dataset.assign({name: spectrum.dataset[name]})

    return FrequencySpectrum(dataset)


def cumulative_frequency_interpolation_1d_variable(
    interpolation_frequency, dataset: xarray.Dataset, **kwargs
):
    """
    To interpolate the spectrum we first calculate a cumulative density function from
    the spectrum (which is essentialya pdf). We then interpolate the CDF function with
    a spline and differentiate the result.

    :param interpolation_frequency:
    :param dataset:
    :return:
    """

    _dataset = xarray.Dataset()

    # Copy over all non spectral vars
    for name in dataset:
        _name = str(name)
        if _name not in SPECTRAL_VARS:
            _dataset = _dataset.assign({_name: dataset[_name]})

    coords = {
        str(_coor_name): dataset[str(_coor_name)]
        for _coor_name in dataset[NAME_E].coords
    }
    coords[NAME_F] = interpolation_frequency
    dims = dataset[NAME_E].dims

    # Interpolate energy
    interpolated_cdf_spline = _cdf_interpolate_spline(
        dataset[NAME_F].values,
        dataset[NAME_E].values,
        monotone_interpolation=kwargs.get("monotone_interpolation", True),
        frequency_axis=kwargs.get("frequency_axis", -1),
    )
    interpolated_energy = interpolated_cdf_spline.derivative()(interpolation_frequency)

    _dataset = _dataset.assign(
        {
            NAME_E: xarray.DataArray(
                data=interpolated_energy,
                coords=coords,
                dims=dims,
            )
        }
    )

    msk = interpolated_energy > 0

    for _name in SPECTRAL_MOMENTS:
        interpolated_densities_spline = _cdf_interpolate_spline(
            dataset[NAME_F].values,
            dataset[_name].values * dataset[NAME_E].values,
            monotone_interpolation=kwargs.get("monotone_interpolation_moments", False),
        )
        interpolated_densities = interpolated_densities_spline.derivative()(
            interpolation_frequency
        )
        # Avoid division by zero
        interpolated_densities[msk] = (
            interpolated_densities[msk] / interpolated_energy[msk]
        )

        _dataset = _dataset.assign(
            {
                _name: xarray.DataArray(
                    data=interpolated_densities,
                    coords=coords,
                    dims=dims,
                )
            }
        )

    return _dataset

Functions

def create_1d_spectrum(frequency: numpy.ndarray, variance_density: numpy.ndarray, time: Union[numpy.ndarray, float], latitude: Union[numpy.ndarray, float], longitude: Union[numpy.ndarray, float], a1: Optional[numpy.ndarray] = None, b1: Optional[numpy.ndarray] = None, a2: Optional[numpy.ndarray] = None, b2: Optional[numpy.ndarray] = None, depth: Union[numpy.ndarray, float] = inf, dims=('time', 'frequency')) ‑> FrequencySpectrum
Expand source code
def create_1d_spectrum(
    frequency: np.ndarray,
    variance_density: np.ndarray,
    time: Union[np.ndarray, float],
    latitude: Union[np.ndarray, float],
    longitude: Union[np.ndarray, float],
    a1: Optional[np.ndarray] = None,
    b1: Optional[np.ndarray] = None,
    a2: Optional[np.ndarray] = None,
    b2: Optional[np.ndarray] = None,
    depth: Union[np.ndarray, float] = np.inf,
    dims=(NAME_T, NAME_F),
) -> FrequencySpectrum:
    if a1 is None:
        a1 = np.nan + np.ones_like(variance_density)
    if b1 is None:
        b1 = np.nan + np.ones_like(variance_density)
    if a2 is None:
        a2 = np.nan + np.ones_like(variance_density)
    if b2 is None:
        b2 = np.nan + np.ones_like(variance_density)

    variables = {
        NAME_T: np.atleast_1d(to_datetime64(time)),
        NAME_LAT: np.atleast_1d(latitude),
        NAME_LON: np.atleast_1d(longitude),
        NAME_DEPTH: np.atleast_1d(depth),
        NAME_F: frequency,
        NAME_E: variance_density,
        NAME_a1: a1,
        NAME_b1: b1,
        NAME_a2: a2,
        NAME_b2: b2,
    }

    return FrequencySpectrum(create_spectrum_dataset(dims, variables))
def create_2d_spectrum(frequency: numpy.ndarray, direction: numpy.ndarray, variance_density: numpy.ndarray, time, latitude: Union[numpy.ndarray, float, ForwardRef(None)], longitude: Union[numpy.ndarray, float, ForwardRef(None)], dims=('time', 'frequency', 'direction'), depth: Union[numpy.ndarray, float] = inf) ‑> FrequencyDirectionSpectrum

:param frequency: :param direction: :param variance_density: :param time: :param latitude: :param longitude: :param dims: :param depth: :return:

Expand source code
def create_2d_spectrum(
    frequency: np.ndarray,
    direction: np.ndarray,
    variance_density: np.ndarray,
    time,
    latitude: Union[np.ndarray, float, None],
    longitude: Union[np.ndarray, float, None],
    dims=(NAME_T, NAME_F, NAME_D),
    depth: Union[np.ndarray, float] = np.inf,
) -> FrequencyDirectionSpectrum:
    """
    :param frequency:
    :param direction:
    :param variance_density:
    :param time:
    :param latitude:
    :param longitude:
    :param dims:
    :param depth:
    :return:
    """

    variables = {
        NAME_T: np.atleast_1d(to_datetime64(time)),
        NAME_LAT: np.atleast_1d(latitude),
        NAME_LON: np.atleast_1d(longitude),
        NAME_DEPTH: np.atleast_1d(depth),
        NAME_F: frequency,
        NAME_D: direction,
        NAME_E: variance_density,
    }
    return FrequencyDirectionSpectrum(create_spectrum_dataset(dims, variables))
def create_spectrum_dataset(dims, variables) ‑> xarray.core.dataset.Dataset
Expand source code
def create_spectrum_dataset(dims, variables) -> xarray.Dataset:
    independent_variables = []
    for dim in dims:
        if dim in variables:
            independent_variables.append(dim)

    dependent_variables = [x for x in variables if x not in independent_variables]

    spectral_coords = {k: variables[k] for k in independent_variables}
    spatial_coords = {
        k: variables[k] for k in independent_variables if k not in SPECTRAL_DIMS
    }

    dataset = xarray.Dataset()
    for variable in dependent_variables:
        if variable in SPECTRAL_VARS:
            coords = spectral_coords
        else:
            coords = spatial_coords
        dims = [k for k in coords]

        if dims:
            dataset = dataset.assign(
                {
                    variable: xarray.DataArray(
                        data=variables[variable], dims=dims, coords=coords
                    )
                }
            )

        else:
            if len(variables[variable]) == 1:
                # If no coordinate is known, and the variable has length 1, we add it
                # as a scalar.
                data = variables[variable][0]
            else:
                # otherwise we add without coordinate/dimension.
                data = xarray.DataArray(data=variables[variable])

            dataset = dataset.assign({variable: data})
    return dataset
def cumulative_frequency_interpolation_1d_variable(interpolation_frequency, dataset: xarray.core.dataset.Dataset, **kwargs)

To interpolate the spectrum we first calculate a cumulative density function from the spectrum (which is essentialya pdf). We then interpolate the CDF function with a spline and differentiate the result.

:param interpolation_frequency: :param dataset: :return:

Expand source code
def cumulative_frequency_interpolation_1d_variable(
    interpolation_frequency, dataset: xarray.Dataset, **kwargs
):
    """
    To interpolate the spectrum we first calculate a cumulative density function from
    the spectrum (which is essentialya pdf). We then interpolate the CDF function with
    a spline and differentiate the result.

    :param interpolation_frequency:
    :param dataset:
    :return:
    """

    _dataset = xarray.Dataset()

    # Copy over all non spectral vars
    for name in dataset:
        _name = str(name)
        if _name not in SPECTRAL_VARS:
            _dataset = _dataset.assign({_name: dataset[_name]})

    coords = {
        str(_coor_name): dataset[str(_coor_name)]
        for _coor_name in dataset[NAME_E].coords
    }
    coords[NAME_F] = interpolation_frequency
    dims = dataset[NAME_E].dims

    # Interpolate energy
    interpolated_cdf_spline = _cdf_interpolate_spline(
        dataset[NAME_F].values,
        dataset[NAME_E].values,
        monotone_interpolation=kwargs.get("monotone_interpolation", True),
        frequency_axis=kwargs.get("frequency_axis", -1),
    )
    interpolated_energy = interpolated_cdf_spline.derivative()(interpolation_frequency)

    _dataset = _dataset.assign(
        {
            NAME_E: xarray.DataArray(
                data=interpolated_energy,
                coords=coords,
                dims=dims,
            )
        }
    )

    msk = interpolated_energy > 0

    for _name in SPECTRAL_MOMENTS:
        interpolated_densities_spline = _cdf_interpolate_spline(
            dataset[NAME_F].values,
            dataset[_name].values * dataset[NAME_E].values,
            monotone_interpolation=kwargs.get("monotone_interpolation_moments", False),
        )
        interpolated_densities = interpolated_densities_spline.derivative()(
            interpolation_frequency
        )
        # Avoid division by zero
        interpolated_densities[msk] = (
            interpolated_densities[msk] / interpolated_energy[msk]
        )

        _dataset = _dataset.assign(
            {
                _name: xarray.DataArray(
                    data=interpolated_densities,
                    coords=coords,
                    dims=dims,
                )
            }
        )

    return _dataset
def fill_zeros_or_nan_in_tail(spectrum: WaveSpectrum, power=None, tail_energy=None, tail_bounds=None) ‑> FrequencySpectrum
Expand source code
def fill_zeros_or_nan_in_tail(
    spectrum: WaveSpectrum,
    power=None,
    tail_energy=None,
    tail_bounds=None,
) -> FrequencySpectrum:
    variance_density = spectrum.e
    a1 = spectrum.a1
    b1 = spectrum.b1
    a2 = spectrum.a2
    b2 = spectrum.b2

    if tail_energy is not None:
        if isinstance(tail_energy, xarray.DataArray):
            tail_energy = tail_energy.values

        tail_information = (tail_bounds, tail_energy)
    else:
        tail_information = None

    variance_density = xarray.DataArray(
        data=numba_fill_zeros_or_nan_in_tail(
            variance_density.values,
            variance_density.frequency.values,
            power,
            tail_information=tail_information,
        ),
        dims=a1.dims,
        coords=a1.coords,
    )

    dataset = xarray.Dataset(
        {
            "variance_density": variance_density,
            "a1": a1,
            "b1": b1,
            "a2": a2,
            "b2": b2,
        }
    )

    for name in spectrum.dataset:
        if name in SPECTRAL_VARS:
            continue
        else:
            dataset = dataset.assign({name: spectrum.dataset[name]})

    return FrequencySpectrum(dataset)
def load_spectrum_from_netcdf(filename_or_obj) ‑> Union[FrequencySpectrumFrequencyDirectionSpectrum]

Load a spectrum from netcdf file :param filename_or_obj: :return:

Expand source code
def load_spectrum_from_netcdf(
    filename_or_obj,
) -> Union[FrequencySpectrum, FrequencyDirectionSpectrum]:
    """
    Load a spectrum from netcdf file
    :param filename_or_obj:
    :return:
    """
    dataset = xarray.open_dataset(filename_or_obj=filename_or_obj)
    if NAME_D in dataset.coords:
        return FrequencyDirectionSpectrum(dataset=dataset)
    else:
        return FrequencySpectrum(dataset=dataset)

Classes

class DatasetWrapper (dataset: xarray.core.dataset.Dataset)

A class that wraps a dataset object and passes through some of its primary functionality (get/set etc.). Used here mostly to make explicit what parts of the Dataset interface we actually expose in frequency objects. Note that we do not claim- or try to obtain completeness here. If full capabilities of the dataset object are needed we can simple operate directly on the dataset object itself.

Expand source code
class DatasetWrapper:
    """
    A class that wraps a dataset object and passes through some of its primary
    functionality (get/set etc.). Used here mostly to make explicit what parts
    of the Dataset interface we actually expose in frequency objects. Note that
    we do not claim- or try to obtain completeness here. If full capabilities
    of the dataset object are needed we can simple operate directly on the
    dataset object itself.
    """

    def __init__(self, dataset: xarray.Dataset):
        self.dataset = dataset

    def __getitem__(self, item) -> xarray.DataArray:
        return self.dataset.__getitem__(item)

    def __setitem__(self, key, value) -> None:
        return self.dataset.__setitem__(key, value)

    def __copy__(self):
        cls = self.__class__
        return cls(self.dataset.copy())

    def __len__(self) -> int:
        return len(self.dataset)

    def copy(self, deep=True):
        if deep:
            return self.__deepcopy__({})
        else:
            return self.__copy__()

    def __deepcopy__(self, memodict):
        cls = self.__class__
        return cls(self.dataset.copy(deep=True))

    def coords(self) -> xarray.core.coordinates.DatasetCoordinates:
        return self.dataset.coords

    def keys(self):
        return self.dataset.keys()

    def __contains__(self, key: object) -> bool:
        return key in self.dataset

    def __iter__(self) -> Iterator[Hashable]:
        return self.dataset.__iter__()

    def sel(self, *args, method="nearest"):
        cls = type(self)
        dataset = xarray.Dataset()
        for var in self.dataset:
            dataset = dataset.assign({var: self.dataset[var].sel(*args, method=method)})
        return cls(dataset=dataset)

    def isel(self, *args, **kwargs):
        cls = type(self)
        dataset = xarray.Dataset()
        for var in self.dataset:
            dataset = dataset.assign({var: self.dataset[var].isel(*args, **kwargs)})
        return cls(dataset=dataset)

Subclasses

Methods

def coords(self) ‑> xarray.core.coordinates.DatasetCoordinates
Expand source code
def coords(self) -> xarray.core.coordinates.DatasetCoordinates:
    return self.dataset.coords
def copy(self, deep=True)
Expand source code
def copy(self, deep=True):
    if deep:
        return self.__deepcopy__({})
    else:
        return self.__copy__()
def isel(self, *args, **kwargs)
Expand source code
def isel(self, *args, **kwargs):
    cls = type(self)
    dataset = xarray.Dataset()
    for var in self.dataset:
        dataset = dataset.assign({var: self.dataset[var].isel(*args, **kwargs)})
    return cls(dataset=dataset)
def keys(self)
Expand source code
def keys(self):
    return self.dataset.keys()
def sel(self, *args, method='nearest')
Expand source code
def sel(self, *args, method="nearest"):
    cls = type(self)
    dataset = xarray.Dataset()
    for var in self.dataset:
        dataset = dataset.assign({var: self.dataset[var].sel(*args, method=method)})
    return cls(dataset=dataset)
class FrequencyDirectionSpectrum (dataset: xarray.core.dataset.Dataset)

A class that wraps a dataset object and passes through some of its primary functionality (get/set etc.). Used here mostly to make explicit what parts of the Dataset interface we actually expose in frequency objects. Note that we do not claim- or try to obtain completeness here. If full capabilities of the dataset object are needed we can simple operate directly on the dataset object itself.

Expand source code
class FrequencyDirectionSpectrum(WaveSpectrum):
    def __init__(self, dataset: xarray.Dataset):
        super(FrequencyDirectionSpectrum, self).__init__(dataset)
        for name in NAMES_2D:
            if name not in dataset and name not in dataset.coords:
                raise ValueError(
                    f"Required variable/coordinate {name} is"
                    f" not specified in the dataset"
                )

    def __len__(self):
        return int(np.prod(self.spectral_values.shape[:-2]))

    @property
    def direction_step(self) -> xarray.DataArray:
        difference = wrapped_difference(
            np.diff(self.direction.values, append=self.direction[0]), period=360
        )
        return xarray.DataArray(
            data=difference, coords={NAME_D: self.direction.values}, dims=[NAME_D]
        )

    @property
    def radian_direction(self) -> xarray.DataArray:
        data_array = self.dataset[NAME_D] * np.pi / 180
        data_array.name = "radian_direction"
        return data_array

    def _directionally_integrate(
        self, data_array: xarray.DataArray
    ) -> xarray.DataArray:
        return (data_array * self.direction_step).sum(NAME_D, skipna=True)

    @property
    def e(self) -> xarray.DataArray:
        return self._directionally_integrate(self.dataset[NAME_E])

    @property
    def a1(self) -> xarray.DataArray:
        return (
            self._directionally_integrate(
                self.dataset[NAME_E] * np.cos(self.radian_direction)
            )
            / self.e
        )

    @property
    def b1(self) -> xarray.DataArray:
        return (
            self._directionally_integrate(
                self.dataset[NAME_E] * np.sin(self.radian_direction)
            )
            / self.e
        )

    @property
    def a2(self) -> xarray.DataArray:
        return (
            self._directionally_integrate(
                self.dataset[NAME_E] * np.cos(2 * self.radian_direction)
            )
            / self.e
        )

    @property
    def b2(self) -> xarray.DataArray:
        return (
            self._directionally_integrate(
                self.dataset[NAME_E] * np.sin(2 * self.radian_direction)
            )
            / self.e
        )

    @property
    def direction(self) -> xarray.DataArray:
        return self.dataset[NAME_D]

    def as_frequency_spectrum(self):
        dataset = {
            "a1": self.a1,
            "b1": self.b1,
            "a2": self.a2,
            "b2": self.b2,
            "variance_density": self.e,
        }
        for name in self.dataset:
            if name not in SPECTRAL_VARS:
                dataset[str(name)] = self.dataset[name]

        return FrequencySpectrum(xarray.Dataset(dataset))

    def spectrum_1d(self):
        """
        Will be depricated
        :return:
        """
        warn(
            'spectrum_1d method will be removed, use "as_frequency_spectrum" instead',
            DeprecationWarning,
            stacklevel=2,
        )
        return self.as_frequency_spectrum()

    def differentiate(self, coordinate=None, **kwargs) -> "FrequencyDirectionSpectrum":
        if coordinate is None:
            coordinate = "time"

        if coordinate not in self.dataset:
            raise ValueError(f"Coordinate {coordinate} does not exist in the dataset")

        data = {
            NAME_E: (
                self.dims,
                self.variance_density.differentiate(
                    coordinate, datetime_unit="s", **kwargs
                ).values,
            )
        }
        for x in self.dataset:
            if x in SPECTRAL_VARS:
                continue
            data[x] = (self.dims_space_time, self.dataset[x].values)

        return FrequencyDirectionSpectrum(
            xarray.Dataset(data_vars=data, coords=self.coords())
        )

    @property
    def number_of_directions(self) -> int:
        return len(self.direction)

Ancestors

Instance variables

var direction : xarray.core.dataarray.DataArray
Expand source code
@property
def direction(self) -> xarray.DataArray:
    return self.dataset[NAME_D]
var direction_step : xarray.core.dataarray.DataArray
Expand source code
@property
def direction_step(self) -> xarray.DataArray:
    difference = wrapped_difference(
        np.diff(self.direction.values, append=self.direction[0]), period=360
    )
    return xarray.DataArray(
        data=difference, coords={NAME_D: self.direction.values}, dims=[NAME_D]
    )
var number_of_directions : int
Expand source code
@property
def number_of_directions(self) -> int:
    return len(self.direction)
var radian_direction : xarray.core.dataarray.DataArray
Expand source code
@property
def radian_direction(self) -> xarray.DataArray:
    data_array = self.dataset[NAME_D] * np.pi / 180
    data_array.name = "radian_direction"
    return data_array

Methods

def as_frequency_spectrum(self)
Expand source code
def as_frequency_spectrum(self):
    dataset = {
        "a1": self.a1,
        "b1": self.b1,
        "a2": self.a2,
        "b2": self.b2,
        "variance_density": self.e,
    }
    for name in self.dataset:
        if name not in SPECTRAL_VARS:
            dataset[str(name)] = self.dataset[name]

    return FrequencySpectrum(xarray.Dataset(dataset))
def differentiate(self, coordinate=None, **kwargs) ‑> FrequencyDirectionSpectrum
Expand source code
def differentiate(self, coordinate=None, **kwargs) -> "FrequencyDirectionSpectrum":
    if coordinate is None:
        coordinate = "time"

    if coordinate not in self.dataset:
        raise ValueError(f"Coordinate {coordinate} does not exist in the dataset")

    data = {
        NAME_E: (
            self.dims,
            self.variance_density.differentiate(
                coordinate, datetime_unit="s", **kwargs
            ).values,
        )
    }
    for x in self.dataset:
        if x in SPECTRAL_VARS:
            continue
        data[x] = (self.dims_space_time, self.dataset[x].values)

    return FrequencyDirectionSpectrum(
        xarray.Dataset(data_vars=data, coords=self.coords())
    )
def spectrum_1d(self)

Will be depricated :return:

Expand source code
def spectrum_1d(self):
    """
    Will be depricated
    :return:
    """
    warn(
        'spectrum_1d method will be removed, use "as_frequency_spectrum" instead',
        DeprecationWarning,
        stacklevel=2,
    )
    return self.as_frequency_spectrum()

Inherited members

class FrequencySpectrum (dataset: xarray.core.dataset.Dataset)

A class that wraps a dataset object and passes through some of its primary functionality (get/set etc.). Used here mostly to make explicit what parts of the Dataset interface we actually expose in frequency objects. Note that we do not claim- or try to obtain completeness here. If full capabilities of the dataset object are needed we can simple operate directly on the dataset object itself.

Expand source code
class FrequencySpectrum(WaveSpectrum):
    def __init__(self, dataset: xarray.Dataset):
        super(FrequencySpectrum, self).__init__(dataset)
        for name in NAMES_1D:
            if name not in dataset and name not in dataset.coords:
                raise ValueError(
                    f"Required variable/coordinate {name} is"
                    f" not specified in the dataset"
                )

    def __len__(self):
        return int(np.prod(self.spectral_values.shape[:-1]))

    def interpolate_frequency(
        self: "FrequencySpectrum",
        new_frequencies: Union[xarray.DataArray, np.ndarray],
        extrapolation_value=0.0,
        method: Literal["nearest", "linear", "spline"] = "linear",
        **kwargs,
    ) -> "FrequencySpectrum":
        if isinstance(new_frequencies, xarray.DataArray):
            new_frequencies = new_frequencies.values

        if method == "spline":
            self.fillna(0.0)
            frequency_axis = self.dims.index(NAME_F)
            interpolated_data = cumulative_frequency_interpolation_1d_variable(
                new_frequencies, self.dataset, frequency_axis=frequency_axis, **kwargs
            )
            object = FrequencySpectrum(interpolated_data)
            object.fillna(extrapolation_value)
            return object
        elif method == "linear":
            return self.interpolate(
                {NAME_F: new_frequencies},
                extrapolation_value=extrapolation_value,
                nearest_neighbour=False,
            )
        elif method == "nearest":
            return self.interpolate(
                {NAME_F: new_frequencies},
                extrapolation_value=extrapolation_value,
                nearest_neighbour=True,
            )
        else:
            raise ValueError(f"Unknown interpolation method: {method}")

    def interpolate(
        self: "FrequencySpectrum",
        coordinates,
        extrapolation_value=0.0,
        nearest_neighbour=False,
    ) -> "FrequencySpectrum":
        """

        :param coordinates:
        :return:
        """
        _dataset = xarray.Dataset()
        _moments = [NAME_a1, NAME_b1, NAME_a2, NAME_b2]

        # For physical reasons it is better to interpolate the scaled moments -
        # as opposed to the normalized moments. For the dataset we interpolate
        # we set a1: to A1 etc. Afterwards we scale the output back to the normalized
        # state.
        for name in self.dataset:
            _name = str(name)
            if _name in _moments:
                _dataset = _dataset.assign({_name: getattr(self, _name) * self.e})
            else:
                _dataset = _dataset.assign({_name: self.dataset[_name]})

        interpolated_data = interpolate_dataset_grid(
            coordinates, _dataset, nearest_neighbour
        )
        for name in _moments:
            interpolated_data[name] = (
                interpolated_data[name] / interpolated_data[NAME_E]
            )

        object = FrequencySpectrum(interpolated_data)
        object.fillna(extrapolation_value)
        return object

    def down_sample(self, frequencies):
        cdf = self.cdf()

        frequency_step = midpoint_rule_step(frequencies)
        sampling_frequencies = np.concatenate(([0], np.cumsum(frequency_step)))
        sampling_frequencies = (
            sampling_frequencies - frequency_step[0] / 2 + frequencies[0]
        )

        dims = self.dims
        sampled_cdf = cdf.sel({"frequency": sampling_frequencies}, method="nearest")
        data = {
            NAME_E: (dims, sampled_cdf.diff(dim="frequency").values / frequency_step),
            NAME_a1: (
                dims,
                self.a1.sel({"frequency": frequencies}, method="nearest").values,
            ),
            NAME_b1: (
                dims,
                self.b1.sel({"frequency": frequencies}, method="nearest").values,
            ),
            NAME_a2: (
                dims,
                self.a2.sel({"frequency": frequencies}, method="nearest").values,
            ),
            NAME_b2: (
                dims,
                self.b2.sel({"frequency": frequencies}, method="nearest").values,
            ),
        }

        coords = {x: self.dataset[x].values for x in self.dims}
        coords[NAME_F] = frequencies

        for x in self.dataset:
            if x in SPECTRAL_VARS:
                continue
            data[x] = (self.dims_space_time, self.dataset[x].values)

        return FrequencySpectrum(xarray.Dataset(data_vars=data, coords=coords))

    def as_frequency_direction_spectrum(
        self,
        number_of_directions,
        method: Estimators = "mem2",
        solution_method="scipy",
    ) -> "FrequencyDirectionSpectrum":
        direction = np.linspace(0, 360, number_of_directions, endpoint=False)

        output_array = (
            estimate_directional_distribution(
                self.a1.values,
                self.b1.values,
                self.a2.values,
                self.b2.values,
                direction,
                method=method,
                solution_method=solution_method,
            )
            * self.e.values[..., None]
        )

        dims = self.dims_space_time + [NAME_F, NAME_D]
        coords = {x: self.dataset[x].values for x in self.dims}
        coords[NAME_D] = direction

        data = {NAME_E: (dims, output_array)}
        for x in self.dataset:
            if x in SPECTRAL_VARS:
                continue
            data[x] = (self.dims_space_time, self.dataset[x].values)

        return FrequencyDirectionSpectrum(xarray.Dataset(data_vars=data, coords=coords))

Ancestors

Methods

def as_frequency_direction_spectrum(self, number_of_directions, method: Literal['mem', 'mem2'] = 'mem2', solution_method='scipy') ‑> FrequencyDirectionSpectrum
Expand source code
def as_frequency_direction_spectrum(
    self,
    number_of_directions,
    method: Estimators = "mem2",
    solution_method="scipy",
) -> "FrequencyDirectionSpectrum":
    direction = np.linspace(0, 360, number_of_directions, endpoint=False)

    output_array = (
        estimate_directional_distribution(
            self.a1.values,
            self.b1.values,
            self.a2.values,
            self.b2.values,
            direction,
            method=method,
            solution_method=solution_method,
        )
        * self.e.values[..., None]
    )

    dims = self.dims_space_time + [NAME_F, NAME_D]
    coords = {x: self.dataset[x].values for x in self.dims}
    coords[NAME_D] = direction

    data = {NAME_E: (dims, output_array)}
    for x in self.dataset:
        if x in SPECTRAL_VARS:
            continue
        data[x] = (self.dims_space_time, self.dataset[x].values)

    return FrequencyDirectionSpectrum(xarray.Dataset(data_vars=data, coords=coords))
def down_sample(self, frequencies)
Expand source code
def down_sample(self, frequencies):
    cdf = self.cdf()

    frequency_step = midpoint_rule_step(frequencies)
    sampling_frequencies = np.concatenate(([0], np.cumsum(frequency_step)))
    sampling_frequencies = (
        sampling_frequencies - frequency_step[0] / 2 + frequencies[0]
    )

    dims = self.dims
    sampled_cdf = cdf.sel({"frequency": sampling_frequencies}, method="nearest")
    data = {
        NAME_E: (dims, sampled_cdf.diff(dim="frequency").values / frequency_step),
        NAME_a1: (
            dims,
            self.a1.sel({"frequency": frequencies}, method="nearest").values,
        ),
        NAME_b1: (
            dims,
            self.b1.sel({"frequency": frequencies}, method="nearest").values,
        ),
        NAME_a2: (
            dims,
            self.a2.sel({"frequency": frequencies}, method="nearest").values,
        ),
        NAME_b2: (
            dims,
            self.b2.sel({"frequency": frequencies}, method="nearest").values,
        ),
    }

    coords = {x: self.dataset[x].values for x in self.dims}
    coords[NAME_F] = frequencies

    for x in self.dataset:
        if x in SPECTRAL_VARS:
            continue
        data[x] = (self.dims_space_time, self.dataset[x].values)

    return FrequencySpectrum(xarray.Dataset(data_vars=data, coords=coords))
def interpolate(self: FrequencySpectrum, coordinates, extrapolation_value=0.0, nearest_neighbour=False) ‑> FrequencySpectrum

:param coordinates: :return:

Expand source code
def interpolate(
    self: "FrequencySpectrum",
    coordinates,
    extrapolation_value=0.0,
    nearest_neighbour=False,
) -> "FrequencySpectrum":
    """

    :param coordinates:
    :return:
    """
    _dataset = xarray.Dataset()
    _moments = [NAME_a1, NAME_b1, NAME_a2, NAME_b2]

    # For physical reasons it is better to interpolate the scaled moments -
    # as opposed to the normalized moments. For the dataset we interpolate
    # we set a1: to A1 etc. Afterwards we scale the output back to the normalized
    # state.
    for name in self.dataset:
        _name = str(name)
        if _name in _moments:
            _dataset = _dataset.assign({_name: getattr(self, _name) * self.e})
        else:
            _dataset = _dataset.assign({_name: self.dataset[_name]})

    interpolated_data = interpolate_dataset_grid(
        coordinates, _dataset, nearest_neighbour
    )
    for name in _moments:
        interpolated_data[name] = (
            interpolated_data[name] / interpolated_data[NAME_E]
        )

    object = FrequencySpectrum(interpolated_data)
    object.fillna(extrapolation_value)
    return object
def interpolate_frequency(self: FrequencySpectrum, new_frequencies: Union[xarray.core.dataarray.DataArray, numpy.ndarray], extrapolation_value=0.0, method: Literal['nearest', 'linear', 'spline'] = 'linear', **kwargs) ‑> FrequencySpectrum
Expand source code
def interpolate_frequency(
    self: "FrequencySpectrum",
    new_frequencies: Union[xarray.DataArray, np.ndarray],
    extrapolation_value=0.0,
    method: Literal["nearest", "linear", "spline"] = "linear",
    **kwargs,
) -> "FrequencySpectrum":
    if isinstance(new_frequencies, xarray.DataArray):
        new_frequencies = new_frequencies.values

    if method == "spline":
        self.fillna(0.0)
        frequency_axis = self.dims.index(NAME_F)
        interpolated_data = cumulative_frequency_interpolation_1d_variable(
            new_frequencies, self.dataset, frequency_axis=frequency_axis, **kwargs
        )
        object = FrequencySpectrum(interpolated_data)
        object.fillna(extrapolation_value)
        return object
    elif method == "linear":
        return self.interpolate(
            {NAME_F: new_frequencies},
            extrapolation_value=extrapolation_value,
            nearest_neighbour=False,
        )
    elif method == "nearest":
        return self.interpolate(
            {NAME_F: new_frequencies},
            extrapolation_value=extrapolation_value,
            nearest_neighbour=True,
        )
    else:
        raise ValueError(f"Unknown interpolation method: {method}")

Inherited members

class WaveSpectrum (dataset: xarray.core.dataset.Dataset)

A class that wraps a dataset object and passes through some of its primary functionality (get/set etc.). Used here mostly to make explicit what parts of the Dataset interface we actually expose in frequency objects. Note that we do not claim- or try to obtain completeness here. If full capabilities of the dataset object are needed we can simple operate directly on the dataset object itself.

Expand source code
class WaveSpectrum(DatasetWrapper):
    frequency_units = "Hertz"
    angular_units = "Degrees"
    spectral_density_units = "m**2/Hertz"
    angular_convention = (
        "Wave travel direction (going-to), measured anti-clockwise from East"
    )
    bulk_properties = (
        "m0",
        "hm0",
        "tm01",
        "tm02",
        "peak_period",
        "peak_direction",
        "peak_directional_spread",
        "mean_direction",
        "mean_directional_spread",
        "peak_frequency",
        "peak_wavenumber",
        "latitude",
        "longitude",
        "time",
    )

    def __init__(self, dataset: xarray.Dataset):
        super(WaveSpectrum, self).__init__(dataset)

    def __add__(self, other):
        cls = type(self)
        spectrum = cls(self.copy(deep=True).dataset)
        spectrum.dataset[NAME_E] = spectrum.dataset[NAME_E] + other.dataset[NAME_E]
        return spectrum

    def __sub__(self, other):
        cls = type(self)
        spectrum = cls(self.copy(deep=True).dataset)
        spectrum.dataset[NAME_E] = spectrum.dataset[NAME_E] - other.dataset[NAME_E]
        return spectrum

    def __neg__(self):
        """
        Negate self- that is -spectrum
        :return: spectrum with all spectral values taken to have the opposite
            sign.
        """
        cls = type(self)
        spectrum = cls(self.copy(deep=True).dataset)
        spectrum.dataset[NAME_E] = -spectrum.dataset[NAME_E]
        return spectrum

    def __len__(self) -> int:
        return self.number_of_spectra

    def __getitem__(self, item) -> xarray.DataArray:
        if isinstance(item, tuple):
            if len(item) < self.ndims:
                raise ValueError(
                    "Indexing requires same number of inputs"
                    f"as dimensions: {self.ndims}"
                )
            space_time_index = item[: -len(self.dims_spectral)]
        else:
            if not self.ndims == 1:
                raise ValueError(
                    "Indexing requires same number of inputs"
                    f"as dimensions: {self.ndims}"
                )
            space_time_index = []

        dataset = xarray.Dataset()
        for var in self.dataset:
            if var in SPECTRAL_VARS:
                dataset = dataset.assign({var: self.dataset[var].__getitem__(item)})
            else:
                if space_time_index:
                    # array
                    dataset = dataset.assign(
                        {var: self.dataset[var].__getitem__(space_time_index)}
                    )
                else:
                    # Scalar
                    dataset = dataset.assign({var: self.dataset[var]})

        for coor in dataset.coords:
            if coor not in dataset.dims:
                dataset = dataset.reset_coords(str(coor))

        cls = type(self)
        return cls(dataset)

    @property
    def ndims(self) -> int:
        return len(self.dims)

    @property
    def frequency_step(self) -> xarray.DataArray:
        prepend = 2 * self.frequency[0] - self.frequency[1]
        append = 2 * self.frequency[-1] - self.frequency[-2]
        diff = np.diff(self.frequency, append=append, prepend=prepend)
        return xarray.DataArray(
            data=(diff[0:-1] * 0.5 + diff[1:] * 0.5),
            dims=NAME_F,
            coords={NAME_F: self.frequency},
        )

    def fillna(self, value=0.0):
        for variable in SPECTRAL_VARS:
            if variable in self.dataset:
                self.dataset[variable] = self.dataset[variable].fillna(value)

    def is_invalid(self) -> xarray.DataArray:
        return self.variance_density.isnull().all(dim=self.dims_spectral)

    def is_valid(self) -> xarray.DataArray:
        return ~self.is_invalid()

    def drop_invalid(self):
        return self._apply_filter(self.is_valid())

    def where(self, condition: xarray.DataArray):
        return self._apply_filter(condition)

    def _apply_filter(self, boolean_mask: xarray.DataArray):
        dataset = xarray.Dataset()
        for var in self.dataset:
            data = self.dataset[var].where(
                boolean_mask.reindex_like(self.dataset[var]), drop=True
            )
            dataset = dataset.assign({var: data})

        cls = type(self)
        return cls(dataset)

    def mean(self, dim, skipna=False):
        """
        Calculate the mean value of the spectrum along the given dimension.
        :param dim: dimension to average over
        :param skipna: whether or not to "skip" nan values; if
            True behaves as np.nanmean
        :return:
        """
        if dim in SPECTRAL_DIMS:
            raise ValueError("Cannot calculate mean over spectral dimensions")

        cls = type(self)
        dataset = xarray.Dataset()
        # Todo: fix averaging over longitude for (prime/anti) meridian issues
        dataset = dataset.assign({dim: self.dataset[dim].mean(dim=dim, skipna=skipna)})
        for x in self.dataset:
            dataset = dataset.assign({x: self.dataset[x].mean(dim=dim, skipna=skipna)})
        return cls(dataset)

    def flatten(self, flattened_coordinate="linear_index"):
        """
        Serialize the non-spectral dimensions creating a single leading dimension
        without a coordinate.
        """

        # Get the current dimensions and shape
        dims = self.dims_space_time
        coords = self.coords_space_time
        shape = self.space_time_shape()
        if len(shape) == 0:
            length = 1
            shape = (1,)
        else:
            length = np.prod(shape)

        # Calculate the flattened shape
        new_shape = (length,)
        new_spectral_shape = (length, *self.spectral_shape())
        new_dims = [flattened_coordinate] + self.dims_spectral

        linear_index = xarray.DataArray(
            data=np.arange(0, length), dims=flattened_coordinate
        )
        indices = np.unravel_index(linear_index.values, shape)

        dataset = {}
        for index, dim in zip(indices, dims):
            dataset[dim] = xarray.DataArray(
                data=coords[dim].values[index], dims=flattened_coordinate
            )

        for name in self.dataset:
            if name in SPECTRAL_VARS:
                x = xarray.DataArray(
                    data=self.dataset[name].values.reshape(new_spectral_shape),
                    dims=new_dims,
                    coords=self.coords_spectral,
                )
            else:
                x = xarray.DataArray(
                    data=self.dataset[name].values.reshape(new_shape),
                    dims=flattened_coordinate,
                )
            dataset[str(name)] = x

        cls = type(self)
        return cls(xarray.Dataset(dataset))

    def sum(self, dim: str, skipna: bool = False):
        """
        Calculate the sum value of the spectrum along the given dimension.
        :param dim: dimension to sum over
        :param skipna: whether or not to "skip" nan values; if True behaves as np.nansum
        :return:
        """

        if dim in SPECTRAL_DIMS:
            raise ValueError("Cannot calculate sum over spectral dimensions")

        cls = type(self)
        dataset = xarray.Dataset()
        # we assign the average coordinate to the dimension we sum over
        dataset = dataset.assign({dim: self.dataset[dim].mean(dim=dim, skipna=skipna)})
        for x in self.dataset:
            dataset = dataset.assign({x: self.dataset[x].sum(dim=dim, skipna=skipna)})
        return cls(dataset)

    def std(self, dim: str, skipna: bool = False):
        """
        Calculate the standard deviation of the spectrum along the given dimension.
        :param dim: dimension to calculate standard deviation over
        :param skipna: whether or not to "skip" nan values; if True behaves as np.nanstd
        :return:
        """
        if dim in SPECTRAL_DIMS:
            raise ValueError(
                "Cannot calculate standard deviation over spectral dimensions"
            )

        cls = type(self)
        dataset = xarray.Dataset()
        # we assign the average coordinate to the dimension we calculate the std over
        dataset = dataset.assign({dim: self.dataset[dim].mean(dim=dim, skipna=skipna)})
        for x in self.dataset:
            dataset = dataset.assign({x: self.dataset[x].std(dim=dim, skipna=skipna)})
        return cls(dataset)

    def shape(self):
        return self.variance_density.shape

    def spectral_shape(self):
        number_of_spectral_dims = len(self.dims_spectral)
        return self.shape()[-number_of_spectral_dims:]

    def space_time_shape(self):
        number_of_spectral_dims = len(self.dims_spectral)
        return self.shape()[:-number_of_spectral_dims]

    def frequency_moment(self, power: int, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        Calculate a "frequency moment" over the given range. A frequency moment
        here refers to the integral:

                    Integral-over-frequency-range[ e(f) * f**power ]

        :param power: power of the frequency
        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: frequency moment
        """
        _range = self._range(fmin, fmax)

        # Integrate dataset over frequencies. Make sure to fill any NaN entries
        # with 0 before the integration.
        return (
            (self.e.isel({NAME_F: _range}) * self.frequency[_range] ** power)
            .fillna(0)
            .integrate(coord=NAME_F)
        )

    @property
    def number_of_frequencies(self) -> int:
        """
        :return: number of frequencies
        """
        return len(self.frequency)

    @property
    def dims_space_time(self) -> List[str]:
        return [str(x) for x in self.variance_density.dims if x not in (SPECTRAL_DIMS)]

    @property
    def coords_space_time(self) -> Mapping[str, xarray.DataArray]:
        return {dim: self.dataset[dim] for dim in self.dims_space_time}

    @property
    def coords_spectral(self) -> Mapping[str, xarray.DataArray]:
        return {dim: self.dataset[dim] for dim in self.dims_spectral}

    @property
    def dims_spectral(self) -> List[str]:
        return [str(x) for x in self.variance_density.dims if x in (SPECTRAL_DIMS)]

    @property
    def dims(self) -> List[str]:
        return [str(x) for x in self.variance_density.dims]

    @property
    def number_of_spectra(self):
        dims = self.dims_space_time
        if dims:
            shape = 1
            for d in dims:
                shape *= len(self.dataset[d])
            return shape
        else:
            return 1

    @property
    def spectral_values(self) -> xarray.DataArray:
        """
        :return: Spectral levels
        """
        return self.dataset[NAME_E]

    @property
    def radian_frequency(self) -> xarray.DataArray:
        """
        :return: Radian frequency
        """
        data_array = self.dataset[NAME_F] * 2 * np.pi
        data_array.name = "radian_frequency"
        return data_array

    @property
    def latitude(self) -> xarray.DataArray:
        """
        :return: latitudes
        """
        return self.dataset[NAME_LAT]

    @property
    def longitude(self) -> xarray.DataArray:
        """
        :return: longitudes
        """
        return self.dataset[NAME_LON]

    @property
    def time(self) -> xarray.DataArray:
        """
        :return: Time
        """
        return self.dataset[NAME_T]

    @property
    def variance_density(self) -> xarray.DataArray:
        """
        :return: Time
        """
        return self.dataset[NAME_E]

    @property
    def values(self) -> np.ndarray:
        """
        Get the raw np representation of the wave spectrum
        :return: Numpy ndarray of the wave spectrum.
        """
        return self.dataset[NAME_E].values

    @property
    def e(self) -> xarray.DataArray:
        """
        :return: 1D spectral values (directionally integrated spectrum).
            Equivalent to self.spectral_values if this is a 1D spectrum.
        """
        return self.dataset[NAME_E]

    @property
    def a1(self) -> xarray.DataArray:
        """
        :return: normalized Fourier moment cos(theta)
        """
        return self.dataset[NAME_a1]

    @property
    def b1(self) -> xarray.DataArray:
        """
        :return: normalized Fourier moment sin(theta)
        """
        return self.dataset[NAME_b1]

    @property
    def a2(self) -> xarray.DataArray:
        """
        :return: normalized Fourier moment cos(2*theta)
        """
        return self.dataset[NAME_a2]

    @property
    def b2(self) -> xarray.DataArray:
        """
        :return: normalized Fourier moment sin(2*theta)
        """
        return self.dataset[NAME_b2]

    @property
    def A1(self) -> xarray.DataArray:
        """
        :return: Fourier moment cos(theta)
        """
        return self.a1 * self.e

    @property
    def B1(self) -> xarray.DataArray:
        """
        :return: Fourier moment sin(theta)
        """
        return self.b1 * self.e

    @property
    def A2(self) -> xarray.DataArray:
        """
        :return: Fourier moment cos(2*theta)
        """
        return self.a2 * self.e

    @property
    def B2(self) -> xarray.DataArray:
        """
        :return: Fourier moment sin(2*theta)
        """
        return self.b2 * self.e

    @property
    def frequency(self) -> xarray.DataArray:
        """
        :return: Frequencies (Hz)
        """
        return self.dataset[NAME_F]

    def m0(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        Zero order frequency moment. Also referred to as variance or energy.

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: variance/energy
        """
        return self.frequency_moment(0, fmin, fmax)

    def m1(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        First order frequency moment. Primarily used in calculating a mean
        period measure (Tm01)

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: first order frequency moment.
        """
        return self.frequency_moment(1, fmin, fmax)

    def wave_speed(self) -> xarray.DataArray:
        """
        :return:
        """
        # Note we multiply inverse wavenumber with frequency to force xarray to return
        # a number_of_points by by number of frequencies data structure.
        return (1 / self.wavenumber) * self.radian_frequency

    def wave_age(self, windspeed):
        return self.peak_wave_speed() / windspeed

    def peak_wave_speed(self) -> xarray.DataArray:
        return 2 * np.pi * self.peak_frequency() / self.peak_wavenumber

    @property
    def wavenumber_density(self) -> xarray.DataArray:
        return self.variance_density * self.group_velocity / (np.pi * 2)

    @property
    def saturation_spectrum(self) -> xarray.DataArray:
        return self.wavenumber_density * self.wavenumber**3

    @property
    def slope_spectrum(self) -> xarray.DataArray:
        return self.variance_density * self.wavenumber**2

    def mean_squared_slope(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        _range = self._range(fmin, fmax)

        # Integrate dataset over frequencies. Make sure to fill any NaN entries with
        # 0 before the integration.
        return (
            self.slope_spectrum.fillna(0).isel({NAME_F: _range}).integrate(coord=NAME_F)
        )

    def m2(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        Second order frequency moment. Primarily used in calculating the zero
        crossing period (Tm02)

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: Second order frequency moment.
        """
        return self.frequency_moment(2, fmin, fmax)

    def hm0(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        Significant wave height estimated from the spectrum, i.e. waveheight
        h estimated from variance m0. Common notation in literature.

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: Significant wave height
        """
        return xarray.DataArray(4 * np.sqrt(self.m0(fmin, fmax)))

    def tm01(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        Mean period, estimated as the inverse of the center of mass of the
        spectral curve under the 1d spectrum.

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: Mean period
        """
        return self.m0(fmin, fmax) / self.m1(fmin, fmax)

    def tm02(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        Zero crossing period based on Rice's spectral estimate.

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: Zero crossing period
        """
        return xarray.DataArray(np.sqrt(self.m0(fmin, fmax) / self.m2(fmin, fmax)))

    def peak_index(self, fmin: float = 0, fmax: float = np.inf) -> xarray.DataArray:
        """
        Index of the peak frequency of the 1d spectrum within the given range
        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: peak indices
        """
        return self.e.where(self._range(fmin, fmax), 0).argmax(dim=NAME_F)

    def peak_frequency(
        self, fmin=0.0, fmax=np.inf, use_spline=False, **kwargs
    ) -> xarray.DataArray:
        """
        Peak frequency of the spectrum, i.e. frequency at which the spectrum
        obtains its maximum.

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :param use_spline: Use a spline based interpolation and determine peak
            frequency from the spline. This
        allows for a continuous estimate of the peak frequency. WARNING: if True the
        fmin and fmax paramteres are IGNORED
        :return: peak frequency
        """
        if use_spline:
            if not fmin == 0.0 or np.isfinite(fmax):
                warn(
                    "The fmin and fmax parameters are ignored"
                    "if use_spline is set to True"
                )

            data = spline_peak_frequency(self.frequency.values, self.e.values, **kwargs)
            if len(self.dims_space_time) == 0:
                data = data[0]

            return xarray.DataArray(
                data=data,
                coords=self.coords_space_time,
                dims=self.dims_space_time,
            )
        else:
            return self.dataset[NAME_F][self.peak_index(fmin, fmax)]

    def peak_angular_frequency(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        """
        Peak frequency of the spectrum, i.e. frequency at which the spectrum
        obtains its maximum.

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: peak frequency
        """
        return self.peak_frequency(fmin, fmax) * np.pi * 2

    def peak_period(
        self, fmin=0, fmax=np.inf, use_spline=False, **kwargs
    ) -> xarray.DataArray:
        """
        Peak period of the spectrum, i.e. period at which the spectrum
        obtains its maximum.

        :param fmin: minimum frequency
        :param fmax: maximum frequency
        :return: peak period
        """
        peak_period = 1 / self.peak_frequency(
            fmin, fmax, use_spline=use_spline, **kwargs
        )
        peak_period.name = "peak period"
        try:
            peak_period = peak_period.drop("frequency")
        except Exception:
            pass
        return peak_period

    def peak_direction(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        index = self.peak_index(fmin, fmax)
        return self._mean_direction(
            self.a1.isel(**{NAME_F: index}), self.b1.isel(**{NAME_F: index})
        )

    def peak_directional_spread(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
        index = self.peak_index(fmin, fmax)
        a1 = self.a1.isel(**{NAME_F: index})
        b1 = self.b1.isel(**{NAME_F: index})
        return self._spread(a1, b1)

    @staticmethod
    def _mean_direction(a1: xarray.DataArray, b1: xarray.DataArray) -> xarray.DataArray:
        return xarray.DataArray(np.arctan2(b1, a1) * 180 / np.pi)

    @staticmethod
    def _spread(a1: xarray.DataArray, b1: xarray.DataArray) -> xarray.DataArray:
        return xarray.DataArray(
            np.sqrt(2 - 2 * np.sqrt(a1**2 + b1**2)) * 180 / np.pi
        )

    @property
    def mean_direction_per_frequency(self) -> xarray.DataArray:
        return self._mean_direction(self.a1, self.b1)

    @property
    def mean_spread_per_frequency(self) -> xarray.DataArray:
        return self._spread(self.a1, self.b1)

    def _spectral_weighted(self, property: xarray.DataArray, fmin=0, fmax=np.inf):
        range = {NAME_F: self._range(fmin, fmax)}

        property = property.fillna(0)
        return np.trapz(
            property.isel(**range) * self.e.isel(**range), self.frequency[range]
        ) / self.m0(fmin, fmax)

    def mean_direction(self, fmin=0, fmax=np.inf):
        return self._mean_direction(self.mean_a1(fmin, fmax), self.mean_b1(fmin, fmax))

    def mean_directional_spread(self, fmin=0, fmax=np.inf):
        return self._spread(self.mean_a1(fmin, fmax), self.mean_b1(fmin, fmax))

    def mean_a1(self, fmin=0, fmax=np.inf):
        return self._spectral_weighted(self.a1, fmin, fmax)

    def mean_b1(self, fmin=0, fmax=np.inf):
        return self._spectral_weighted(self.b1, fmin, fmax)

    def mean_a2(self, fmin=0, fmax=np.inf):
        return self._spectral_weighted(self.a2, fmin, fmax)

    def mean_b2(self, fmin=0, fmax=np.inf):
        return self._spectral_weighted(self.b2, fmin, fmax)

    @property
    def depth(self) -> xarray.DataArray:
        depth = self.dataset[NAME_DEPTH]
        return xarray.where(depth.isnull(), np.inf, depth)

    @property
    def group_velocity(self) -> xarray.DataArray:
        depth = self.depth.expand_dims(dim=NAME_F, axis=-1).values
        k = self.wavenumber.values
        depth = depth * np.ones(k.shape)

        # Construct the output coordinates and dimension of the data array
        return_dimensions = (*self.dims_space_time, NAME_F)
        coords = {}
        for dim in return_dimensions:
            coords[dim] = self.dataset[dim].values

        return xarray.DataArray(
            data=intrinsic_group_velocity(k, depth),
            dims=return_dimensions,
            coords=coords,
        )

    @property
    def wavenumber(self) -> xarray.DataArray:
        """
        Determine the wavenumbers for the frequencies in the spectrum. Note that since
        the dispersion relation depends on depth the returned wavenumber array has the
        dimensions associated with the depth array by the frequency dimension.

        :return: wavenumbers
        """

        # For numba (used in the dispersion relation) we need raw np arrays of
        # the correct dimension
        depth = self.depth.expand_dims(dim=NAME_F, axis=-1).values
        radian_frequency = self.radian_frequency.expand_dims(dim=self.depth.dims).values

        # Broadcasting does not work inside the numba implementaiton, we explicitly
        # need to construct arrays of the correct input dimension.
        depth_shape = depth.shape
        radian_frequency_shape = radian_frequency.shape

        depth = depth * np.ones(radian_frequency_shape)
        radian_frequency = np.ones(depth_shape) * radian_frequency

        # Construct the output coordinates and dimension of the data array
        return_dimensions = (*self.dims_space_time, NAME_F)
        coords = {}
        for dim in return_dimensions:
            coords[dim] = self.dataset[dim].values

        return xarray.DataArray(
            data=inverse_intrinsic_dispersion_relation(radian_frequency, depth),
            dims=return_dimensions,
            coords=coords,
        )

    @property
    def wavelength(self) -> xarray.DataArray:
        return 2 * np.pi / self.wavenumber

    @property
    def peak_wavenumber(self) -> xarray.DataArray:
        index = self.peak_index()
        # Construct the output coordinates and dimension of the data array
        coords = {}
        for dim in self.dims_space_time:
            coords[dim] = self.dataset[dim].values

        return xarray.DataArray(
            data=inverse_intrinsic_dispersion_relation(
                self.radian_frequency[index].values, self.depth.values
            ),
            dims=self.dims_space_time,
            coords=coords,
        )

    def bulk_variables(self) -> xarray.Dataset:
        dataset = xarray.Dataset()
        dataset["significant_waveheight"] = self.significant_waveheight
        dataset["mean_period"] = self.mean_period
        dataset["peak_period"] = self.peak_period()
        dataset["peak_direction"] = self.peak_direction()
        dataset["peak_directional_spread"] = self.peak_directional_spread()
        dataset["mean_direction"] = self.mean_direction()
        dataset["mean_directional_spread"] = self.mean_directional_spread()
        dataset["peak_frequency"] = self.peak_frequency()
        dataset["latitude"] = self.latitude
        dataset["longitude"] = self.longitude
        dataset["timestamp"] = self.time
        return dataset

    @property
    def significant_waveheight(self) -> xarray.DataArray:
        return self.hm0()

    @property
    def mean_period(self) -> xarray.DataArray:
        return self.tm01()

    @property
    def zero_crossing_period(self) -> xarray.DataArray:
        return self.tm02()

    def cdf(self) -> xarray.DataArray:
        """

        :return:
        """
        frequency_step = self.frequency_step
        integration_frequencies = np.concatenate(
            ([0], np.cumsum(frequency_step.values))
        )
        integration_frequencies = (
            integration_frequencies
            - frequency_step.values[0] / 2
            + self.frequency.values[0]
        )
        values = (self.variance_density * frequency_step).values

        frequency_axis = self.dims.index(NAME_F)

        cumsum = np.cumsum(values, axis=frequency_axis)
        # cumsum =  self.variance_density.cumulative_integrate(coord=NAME_F)
        # return cumsum
        shape = list(cumsum.shape)
        shape[frequency_axis] = 1

        cumsum = np.concatenate((np.zeros(shape), cumsum), axis=frequency_axis)

        coords = {str(coor): self.coords()[coor].values for coor in self.coords()}
        coords[NAME_F] = integration_frequencies
        return xarray.DataArray(data=cumsum, dims=self.dims, coords=coords)

    def interpolate(
        self,
        coordinates: Dict[str, Union[xarray.DataArray, np.ndarray]],
        extrapolation_value: float = 0.0,
    ):
        dataset = self.__class__(interpolate_dataset_grid(coordinates, self.dataset))
        dataset.fillna(extrapolation_value)
        return dataset

    def extrapolate_tail(
        self,
        end_frequency,
        power=None,
        tail_energy=None,
        tail_bounds=None,
        tail_moments=None,
        tail_frequency=None,
    ) -> "FrequencySpectrum":
        """
        Extrapolate the tail using the given power
        :param end_frequency: frequency to extrapolate to
        :param power: power to use. If None, a best fit -4 or -5 tail is used.
        :return:
        """
        e = self.e
        a1 = self.a1
        b1 = self.b1
        a2 = self.a2
        b2 = self.b2

        frequency = self.frequency.values
        frequency_delta = frequency[-1] - frequency[-2]
        n = int((end_frequency - frequency[-1]) / frequency_delta) + 1

        fstart = frequency[-1] + frequency_delta
        fend = frequency[-1] + n * frequency_delta

        if tail_frequency is None:
            tail_frequency = np.linspace(fstart, fend, n, endpoint=True)

        tail_frequency = xarray.DataArray(
            data=tail_frequency, coords={"frequency": tail_frequency}, dims="frequency"
        )
        variance_density = xarray.concat(
            (e, e.isel(frequency=-1) * xarray.zeros_like(tail_frequency)),
            dim="frequency",
        )

        tail_a1 = a1.isel(frequency=-1) if tail_moments is None else tail_moments["a1"]
        tail_b1 = b1.isel(frequency=-1) if tail_moments is None else tail_moments["b1"]
        tail_a2 = a2.isel(frequency=-1) if tail_moments is None else tail_moments["a2"]
        tail_b2 = b2.isel(frequency=-1) if tail_moments is None else tail_moments["b2"]

        a1 = xarray.concat(
            (a1, tail_a1 * xarray.ones_like(tail_frequency)), dim="frequency"
        )
        b1 = xarray.concat(
            (b1, tail_b1 * xarray.ones_like(tail_frequency)), dim="frequency"
        )
        a2 = xarray.concat(
            (a2, tail_a2 * xarray.ones_like(tail_frequency)), dim="frequency"
        )
        b2 = xarray.concat(
            (b2, tail_b2 * xarray.ones_like(tail_frequency)), dim="frequency"
        )

        if tail_energy is not None:
            if isinstance(tail_energy, xarray.DataArray):
                tail_energy = tail_energy.values

            tail_information = (tail_bounds, tail_energy)
        else:
            tail_information = None

        variance_density = xarray.DataArray(
            data=numba_fill_zeros_or_nan_in_tail(
                variance_density.values,
                variance_density.frequency.values,
                power,
                tail_information=tail_information,
            ),
            dims=a1.dims,
            coords=a1.coords,
        )

        dataset = xarray.Dataset(
            {
                "variance_density": variance_density,
                "a1": a1,
                "b1": b1,
                "a2": a2,
                "b2": b2,
            }
        )

        for name in self.dataset:
            if name in SPECTRAL_VARS:
                continue
            else:
                dataset = dataset.assign({name: self.dataset[name]})

        return FrequencySpectrum(dataset)

    def bandpass(self, fmin: float = 0, fmax: float = np.inf):
        dataset = xarray.Dataset()

        for name in self.dataset:
            if name in SPECTRAL_VARS:
                data = self.dataset[name].where(
                    (self.frequency >= fmin) & (self.frequency < fmax), drop=True
                )
                dataset = dataset.assign({name: data})
            else:
                dataset = dataset.assign({name: self.dataset[name]})
        cls = type(self)
        return cls(dataset)

    def interpolate_frequency(
        self,
        new_frequencies: Union[xarray.DataArray, np.ndarray],
        extrapolation_value: float = 0.0,
    ):
        obj = self.__class__(
            interpolate_dataset_along_axis(
                new_frequencies, self.dataset, coordinate_name="frequency"
            )
        )
        obj.fillna(extrapolation_value)
        return obj

    def _range(self, fmin=0.0, fmax=np.inf) -> np.ndarray:
        return (self.dataset[NAME_F].values >= fmin) & (
            self.dataset[NAME_F].values < fmax
        )

    def save_as_netcdf(self, path):
        self.dataset.to_netcdf(path)

    def multiply(
        self,
        array: np.ndarray,
        dimensions: Optional[List[str]] = None,
        inplace: bool = False,
    ):
        """
        Multiply the variance density with the given np array. Broadcasting is
        performed automatically if dimensions are provided. If no dimensions are
        provided the array needs to have the exact same shape as the variance
        density array.

        :param array: Array to multiply with variance density
        :param dimension: Dimensions of the array
        :return: self
        """
        if inplace:
            output = self
        else:
            output = self.copy()

        coords = {}
        shape = array.shape
        if dimensions is None:
            if shape != self.shape():
                raise ValueError(
                    "If no dimensions are provided the array must have the exact same"
                    "shape as the variance density array."
                )

            output.dataset[NAME_E] = self.dataset[NAME_E] * array
            return output

        if len(shape) != len(dimensions):
            raise ValueError(
                "The dimensions of the input array must match the number of"
                "dimension labels"
            )

        for length, dimension in zip(shape, dimensions):
            if dimension not in self.dims:
                raise ValueError(
                    f"Dimension {dimension} not a valid dimension of the"
                    "spectral object."
                )
            coords[dimension] = self.dataset[dimension].values

            if len(self.dataset[dimension].values) != length:
                raise ValueError(
                    f"Array length along the dimension {dimension} does not match the"
                    " length of the coordinate of the same name in the spctral object."
                )

        data = xarray.DataArray(data=array, coords=coords, dims=dimensions)
        output.dataset[NAME_E] = self.dataset[NAME_E] * data
        return output

Ancestors

Subclasses

Class variables

var angular_convention
var angular_units
var bulk_properties
var frequency_units
var spectral_density_units

Instance variables

var A1 : xarray.core.dataarray.DataArray

:return: Fourier moment cos(theta)

Expand source code
@property
def A1(self) -> xarray.DataArray:
    """
    :return: Fourier moment cos(theta)
    """
    return self.a1 * self.e
var A2 : xarray.core.dataarray.DataArray

:return: Fourier moment cos(2*theta)

Expand source code
@property
def A2(self) -> xarray.DataArray:
    """
    :return: Fourier moment cos(2*theta)
    """
    return self.a2 * self.e
var B1 : xarray.core.dataarray.DataArray

:return: Fourier moment sin(theta)

Expand source code
@property
def B1(self) -> xarray.DataArray:
    """
    :return: Fourier moment sin(theta)
    """
    return self.b1 * self.e
var B2 : xarray.core.dataarray.DataArray

:return: Fourier moment sin(2*theta)

Expand source code
@property
def B2(self) -> xarray.DataArray:
    """
    :return: Fourier moment sin(2*theta)
    """
    return self.b2 * self.e
var a1 : xarray.core.dataarray.DataArray

:return: normalized Fourier moment cos(theta)

Expand source code
@property
def a1(self) -> xarray.DataArray:
    """
    :return: normalized Fourier moment cos(theta)
    """
    return self.dataset[NAME_a1]
var a2 : xarray.core.dataarray.DataArray

:return: normalized Fourier moment cos(2*theta)

Expand source code
@property
def a2(self) -> xarray.DataArray:
    """
    :return: normalized Fourier moment cos(2*theta)
    """
    return self.dataset[NAME_a2]
var b1 : xarray.core.dataarray.DataArray

:return: normalized Fourier moment sin(theta)

Expand source code
@property
def b1(self) -> xarray.DataArray:
    """
    :return: normalized Fourier moment sin(theta)
    """
    return self.dataset[NAME_b1]
var b2 : xarray.core.dataarray.DataArray

:return: normalized Fourier moment sin(2*theta)

Expand source code
@property
def b2(self) -> xarray.DataArray:
    """
    :return: normalized Fourier moment sin(2*theta)
    """
    return self.dataset[NAME_b2]
var coords_space_time : Mapping[str, xarray.core.dataarray.DataArray]
Expand source code
@property
def coords_space_time(self) -> Mapping[str, xarray.DataArray]:
    return {dim: self.dataset[dim] for dim in self.dims_space_time}
var coords_spectral : Mapping[str, xarray.core.dataarray.DataArray]
Expand source code
@property
def coords_spectral(self) -> Mapping[str, xarray.DataArray]:
    return {dim: self.dataset[dim] for dim in self.dims_spectral}
var depth : xarray.core.dataarray.DataArray
Expand source code
@property
def depth(self) -> xarray.DataArray:
    depth = self.dataset[NAME_DEPTH]
    return xarray.where(depth.isnull(), np.inf, depth)
var dims : List[str]
Expand source code
@property
def dims(self) -> List[str]:
    return [str(x) for x in self.variance_density.dims]
var dims_space_time : List[str]
Expand source code
@property
def dims_space_time(self) -> List[str]:
    return [str(x) for x in self.variance_density.dims if x not in (SPECTRAL_DIMS)]
var dims_spectral : List[str]
Expand source code
@property
def dims_spectral(self) -> List[str]:
    return [str(x) for x in self.variance_density.dims if x in (SPECTRAL_DIMS)]
var e : xarray.core.dataarray.DataArray

:return: 1D spectral values (directionally integrated spectrum). Equivalent to self.spectral_values if this is a 1D spectrum.

Expand source code
@property
def e(self) -> xarray.DataArray:
    """
    :return: 1D spectral values (directionally integrated spectrum).
        Equivalent to self.spectral_values if this is a 1D spectrum.
    """
    return self.dataset[NAME_E]
var frequency : xarray.core.dataarray.DataArray

:return: Frequencies (Hz)

Expand source code
@property
def frequency(self) -> xarray.DataArray:
    """
    :return: Frequencies (Hz)
    """
    return self.dataset[NAME_F]
var frequency_step : xarray.core.dataarray.DataArray
Expand source code
@property
def frequency_step(self) -> xarray.DataArray:
    prepend = 2 * self.frequency[0] - self.frequency[1]
    append = 2 * self.frequency[-1] - self.frequency[-2]
    diff = np.diff(self.frequency, append=append, prepend=prepend)
    return xarray.DataArray(
        data=(diff[0:-1] * 0.5 + diff[1:] * 0.5),
        dims=NAME_F,
        coords={NAME_F: self.frequency},
    )
var group_velocity : xarray.core.dataarray.DataArray
Expand source code
@property
def group_velocity(self) -> xarray.DataArray:
    depth = self.depth.expand_dims(dim=NAME_F, axis=-1).values
    k = self.wavenumber.values
    depth = depth * np.ones(k.shape)

    # Construct the output coordinates and dimension of the data array
    return_dimensions = (*self.dims_space_time, NAME_F)
    coords = {}
    for dim in return_dimensions:
        coords[dim] = self.dataset[dim].values

    return xarray.DataArray(
        data=intrinsic_group_velocity(k, depth),
        dims=return_dimensions,
        coords=coords,
    )
var latitude : xarray.core.dataarray.DataArray

:return: latitudes

Expand source code
@property
def latitude(self) -> xarray.DataArray:
    """
    :return: latitudes
    """
    return self.dataset[NAME_LAT]
var longitude : xarray.core.dataarray.DataArray

:return: longitudes

Expand source code
@property
def longitude(self) -> xarray.DataArray:
    """
    :return: longitudes
    """
    return self.dataset[NAME_LON]
var mean_direction_per_frequency : xarray.core.dataarray.DataArray
Expand source code
@property
def mean_direction_per_frequency(self) -> xarray.DataArray:
    return self._mean_direction(self.a1, self.b1)
var mean_period : xarray.core.dataarray.DataArray
Expand source code
@property
def mean_period(self) -> xarray.DataArray:
    return self.tm01()
var mean_spread_per_frequency : xarray.core.dataarray.DataArray
Expand source code
@property
def mean_spread_per_frequency(self) -> xarray.DataArray:
    return self._spread(self.a1, self.b1)
var ndims : int
Expand source code
@property
def ndims(self) -> int:
    return len(self.dims)
var number_of_frequencies : int

:return: number of frequencies

Expand source code
@property
def number_of_frequencies(self) -> int:
    """
    :return: number of frequencies
    """
    return len(self.frequency)
var number_of_spectra
Expand source code
@property
def number_of_spectra(self):
    dims = self.dims_space_time
    if dims:
        shape = 1
        for d in dims:
            shape *= len(self.dataset[d])
        return shape
    else:
        return 1
var peak_wavenumber : xarray.core.dataarray.DataArray
Expand source code
@property
def peak_wavenumber(self) -> xarray.DataArray:
    index = self.peak_index()
    # Construct the output coordinates and dimension of the data array
    coords = {}
    for dim in self.dims_space_time:
        coords[dim] = self.dataset[dim].values

    return xarray.DataArray(
        data=inverse_intrinsic_dispersion_relation(
            self.radian_frequency[index].values, self.depth.values
        ),
        dims=self.dims_space_time,
        coords=coords,
    )
var radian_frequency : xarray.core.dataarray.DataArray

:return: Radian frequency

Expand source code
@property
def radian_frequency(self) -> xarray.DataArray:
    """
    :return: Radian frequency
    """
    data_array = self.dataset[NAME_F] * 2 * np.pi
    data_array.name = "radian_frequency"
    return data_array
var saturation_spectrum : xarray.core.dataarray.DataArray
Expand source code
@property
def saturation_spectrum(self) -> xarray.DataArray:
    return self.wavenumber_density * self.wavenumber**3
var significant_waveheight : xarray.core.dataarray.DataArray
Expand source code
@property
def significant_waveheight(self) -> xarray.DataArray:
    return self.hm0()
var slope_spectrum : xarray.core.dataarray.DataArray
Expand source code
@property
def slope_spectrum(self) -> xarray.DataArray:
    return self.variance_density * self.wavenumber**2
var spectral_values : xarray.core.dataarray.DataArray

:return: Spectral levels

Expand source code
@property
def spectral_values(self) -> xarray.DataArray:
    """
    :return: Spectral levels
    """
    return self.dataset[NAME_E]
var time : xarray.core.dataarray.DataArray

:return: Time

Expand source code
@property
def time(self) -> xarray.DataArray:
    """
    :return: Time
    """
    return self.dataset[NAME_T]
var values : numpy.ndarray

Get the raw np representation of the wave spectrum :return: Numpy ndarray of the wave spectrum.

Expand source code
@property
def values(self) -> np.ndarray:
    """
    Get the raw np representation of the wave spectrum
    :return: Numpy ndarray of the wave spectrum.
    """
    return self.dataset[NAME_E].values
var variance_density : xarray.core.dataarray.DataArray

:return: Time

Expand source code
@property
def variance_density(self) -> xarray.DataArray:
    """
    :return: Time
    """
    return self.dataset[NAME_E]
var wavelength : xarray.core.dataarray.DataArray
Expand source code
@property
def wavelength(self) -> xarray.DataArray:
    return 2 * np.pi / self.wavenumber
var wavenumber : xarray.core.dataarray.DataArray

Determine the wavenumbers for the frequencies in the spectrum. Note that since the dispersion relation depends on depth the returned wavenumber array has the dimensions associated with the depth array by the frequency dimension.

:return: wavenumbers

Expand source code
@property
def wavenumber(self) -> xarray.DataArray:
    """
    Determine the wavenumbers for the frequencies in the spectrum. Note that since
    the dispersion relation depends on depth the returned wavenumber array has the
    dimensions associated with the depth array by the frequency dimension.

    :return: wavenumbers
    """

    # For numba (used in the dispersion relation) we need raw np arrays of
    # the correct dimension
    depth = self.depth.expand_dims(dim=NAME_F, axis=-1).values
    radian_frequency = self.radian_frequency.expand_dims(dim=self.depth.dims).values

    # Broadcasting does not work inside the numba implementaiton, we explicitly
    # need to construct arrays of the correct input dimension.
    depth_shape = depth.shape
    radian_frequency_shape = radian_frequency.shape

    depth = depth * np.ones(radian_frequency_shape)
    radian_frequency = np.ones(depth_shape) * radian_frequency

    # Construct the output coordinates and dimension of the data array
    return_dimensions = (*self.dims_space_time, NAME_F)
    coords = {}
    for dim in return_dimensions:
        coords[dim] = self.dataset[dim].values

    return xarray.DataArray(
        data=inverse_intrinsic_dispersion_relation(radian_frequency, depth),
        dims=return_dimensions,
        coords=coords,
    )
var wavenumber_density : xarray.core.dataarray.DataArray
Expand source code
@property
def wavenumber_density(self) -> xarray.DataArray:
    return self.variance_density * self.group_velocity / (np.pi * 2)
var zero_crossing_period : xarray.core.dataarray.DataArray
Expand source code
@property
def zero_crossing_period(self) -> xarray.DataArray:
    return self.tm02()

Methods

def bandpass(self, fmin: float = 0, fmax: float = inf)
Expand source code
def bandpass(self, fmin: float = 0, fmax: float = np.inf):
    dataset = xarray.Dataset()

    for name in self.dataset:
        if name in SPECTRAL_VARS:
            data = self.dataset[name].where(
                (self.frequency >= fmin) & (self.frequency < fmax), drop=True
            )
            dataset = dataset.assign({name: data})
        else:
            dataset = dataset.assign({name: self.dataset[name]})
    cls = type(self)
    return cls(dataset)
def bulk_variables(self) ‑> xarray.core.dataset.Dataset
Expand source code
def bulk_variables(self) -> xarray.Dataset:
    dataset = xarray.Dataset()
    dataset["significant_waveheight"] = self.significant_waveheight
    dataset["mean_period"] = self.mean_period
    dataset["peak_period"] = self.peak_period()
    dataset["peak_direction"] = self.peak_direction()
    dataset["peak_directional_spread"] = self.peak_directional_spread()
    dataset["mean_direction"] = self.mean_direction()
    dataset["mean_directional_spread"] = self.mean_directional_spread()
    dataset["peak_frequency"] = self.peak_frequency()
    dataset["latitude"] = self.latitude
    dataset["longitude"] = self.longitude
    dataset["timestamp"] = self.time
    return dataset
def cdf(self) ‑> xarray.core.dataarray.DataArray

:return:

Expand source code
def cdf(self) -> xarray.DataArray:
    """

    :return:
    """
    frequency_step = self.frequency_step
    integration_frequencies = np.concatenate(
        ([0], np.cumsum(frequency_step.values))
    )
    integration_frequencies = (
        integration_frequencies
        - frequency_step.values[0] / 2
        + self.frequency.values[0]
    )
    values = (self.variance_density * frequency_step).values

    frequency_axis = self.dims.index(NAME_F)

    cumsum = np.cumsum(values, axis=frequency_axis)
    # cumsum =  self.variance_density.cumulative_integrate(coord=NAME_F)
    # return cumsum
    shape = list(cumsum.shape)
    shape[frequency_axis] = 1

    cumsum = np.concatenate((np.zeros(shape), cumsum), axis=frequency_axis)

    coords = {str(coor): self.coords()[coor].values for coor in self.coords()}
    coords[NAME_F] = integration_frequencies
    return xarray.DataArray(data=cumsum, dims=self.dims, coords=coords)
def drop_invalid(self)
Expand source code
def drop_invalid(self):
    return self._apply_filter(self.is_valid())
def extrapolate_tail(self, end_frequency, power=None, tail_energy=None, tail_bounds=None, tail_moments=None, tail_frequency=None) ‑> FrequencySpectrum

Extrapolate the tail using the given power :param end_frequency: frequency to extrapolate to :param power: power to use. If None, a best fit -4 or -5 tail is used. :return:

Expand source code
def extrapolate_tail(
    self,
    end_frequency,
    power=None,
    tail_energy=None,
    tail_bounds=None,
    tail_moments=None,
    tail_frequency=None,
) -> "FrequencySpectrum":
    """
    Extrapolate the tail using the given power
    :param end_frequency: frequency to extrapolate to
    :param power: power to use. If None, a best fit -4 or -5 tail is used.
    :return:
    """
    e = self.e
    a1 = self.a1
    b1 = self.b1
    a2 = self.a2
    b2 = self.b2

    frequency = self.frequency.values
    frequency_delta = frequency[-1] - frequency[-2]
    n = int((end_frequency - frequency[-1]) / frequency_delta) + 1

    fstart = frequency[-1] + frequency_delta
    fend = frequency[-1] + n * frequency_delta

    if tail_frequency is None:
        tail_frequency = np.linspace(fstart, fend, n, endpoint=True)

    tail_frequency = xarray.DataArray(
        data=tail_frequency, coords={"frequency": tail_frequency}, dims="frequency"
    )
    variance_density = xarray.concat(
        (e, e.isel(frequency=-1) * xarray.zeros_like(tail_frequency)),
        dim="frequency",
    )

    tail_a1 = a1.isel(frequency=-1) if tail_moments is None else tail_moments["a1"]
    tail_b1 = b1.isel(frequency=-1) if tail_moments is None else tail_moments["b1"]
    tail_a2 = a2.isel(frequency=-1) if tail_moments is None else tail_moments["a2"]
    tail_b2 = b2.isel(frequency=-1) if tail_moments is None else tail_moments["b2"]

    a1 = xarray.concat(
        (a1, tail_a1 * xarray.ones_like(tail_frequency)), dim="frequency"
    )
    b1 = xarray.concat(
        (b1, tail_b1 * xarray.ones_like(tail_frequency)), dim="frequency"
    )
    a2 = xarray.concat(
        (a2, tail_a2 * xarray.ones_like(tail_frequency)), dim="frequency"
    )
    b2 = xarray.concat(
        (b2, tail_b2 * xarray.ones_like(tail_frequency)), dim="frequency"
    )

    if tail_energy is not None:
        if isinstance(tail_energy, xarray.DataArray):
            tail_energy = tail_energy.values

        tail_information = (tail_bounds, tail_energy)
    else:
        tail_information = None

    variance_density = xarray.DataArray(
        data=numba_fill_zeros_or_nan_in_tail(
            variance_density.values,
            variance_density.frequency.values,
            power,
            tail_information=tail_information,
        ),
        dims=a1.dims,
        coords=a1.coords,
    )

    dataset = xarray.Dataset(
        {
            "variance_density": variance_density,
            "a1": a1,
            "b1": b1,
            "a2": a2,
            "b2": b2,
        }
    )

    for name in self.dataset:
        if name in SPECTRAL_VARS:
            continue
        else:
            dataset = dataset.assign({name: self.dataset[name]})

    return FrequencySpectrum(dataset)
def fillna(self, value=0.0)
Expand source code
def fillna(self, value=0.0):
    for variable in SPECTRAL_VARS:
        if variable in self.dataset:
            self.dataset[variable] = self.dataset[variable].fillna(value)
def flatten(self, flattened_coordinate='linear_index')

Serialize the non-spectral dimensions creating a single leading dimension without a coordinate.

Expand source code
def flatten(self, flattened_coordinate="linear_index"):
    """
    Serialize the non-spectral dimensions creating a single leading dimension
    without a coordinate.
    """

    # Get the current dimensions and shape
    dims = self.dims_space_time
    coords = self.coords_space_time
    shape = self.space_time_shape()
    if len(shape) == 0:
        length = 1
        shape = (1,)
    else:
        length = np.prod(shape)

    # Calculate the flattened shape
    new_shape = (length,)
    new_spectral_shape = (length, *self.spectral_shape())
    new_dims = [flattened_coordinate] + self.dims_spectral

    linear_index = xarray.DataArray(
        data=np.arange(0, length), dims=flattened_coordinate
    )
    indices = np.unravel_index(linear_index.values, shape)

    dataset = {}
    for index, dim in zip(indices, dims):
        dataset[dim] = xarray.DataArray(
            data=coords[dim].values[index], dims=flattened_coordinate
        )

    for name in self.dataset:
        if name in SPECTRAL_VARS:
            x = xarray.DataArray(
                data=self.dataset[name].values.reshape(new_spectral_shape),
                dims=new_dims,
                coords=self.coords_spectral,
            )
        else:
            x = xarray.DataArray(
                data=self.dataset[name].values.reshape(new_shape),
                dims=flattened_coordinate,
            )
        dataset[str(name)] = x

    cls = type(self)
    return cls(xarray.Dataset(dataset))
def frequency_moment(self, power: int, fmin=0, fmax=inf) ‑> xarray.core.dataarray.DataArray

Calculate a "frequency moment" over the given range. A frequency moment here refers to the integral:

        Integral-over-frequency-range[ e(f) * f**power ]

:param power: power of the frequency :param fmin: minimum frequency :param fmax: maximum frequency :return: frequency moment

Expand source code
def frequency_moment(self, power: int, fmin=0, fmax=np.inf) -> xarray.DataArray:
    """
    Calculate a "frequency moment" over the given range. A frequency moment
    here refers to the integral:

                Integral-over-frequency-range[ e(f) * f**power ]

    :param power: power of the frequency
    :param fmin: minimum frequency
    :param fmax: maximum frequency
    :return: frequency moment
    """
    _range = self._range(fmin, fmax)

    # Integrate dataset over frequencies. Make sure to fill any NaN entries
    # with 0 before the integration.
    return (
        (self.e.isel({NAME_F: _range}) * self.frequency[_range] ** power)
        .fillna(0)
        .integrate(coord=NAME_F)
    )
def hm0(self, fmin=0, fmax=inf) ‑> xarray.core.dataarray.DataArray

Significant wave height estimated from the spectrum, i.e. waveheight h estimated from variance m0. Common notation in literature.

:param fmin: minimum frequency :param fmax: maximum frequency :return: Significant wave height

Expand source code
def hm0(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
    """
    Significant wave height estimated from the spectrum, i.e. waveheight
    h estimated from variance m0. Common notation in literature.

    :param fmin: minimum frequency
    :param fmax: maximum frequency
    :return: Significant wave height
    """
    return xarray.DataArray(4 * np.sqrt(self.m0(fmin, fmax)))
def interpolate(self, coordinates: Dict[str, Union[xarray.core.dataarray.DataArray, numpy.ndarray]], extrapolation_value: float = 0.0)
Expand source code
def interpolate(
    self,
    coordinates: Dict[str, Union[xarray.DataArray, np.ndarray]],
    extrapolation_value: float = 0.0,
):
    dataset = self.__class__(interpolate_dataset_grid(coordinates, self.dataset))
    dataset.fillna(extrapolation_value)
    return dataset
def interpolate_frequency(self, new_frequencies: Union[xarray.core.dataarray.DataArray, numpy.ndarray], extrapolation_value: float = 0.0)
Expand source code
def interpolate_frequency(
    self,
    new_frequencies: Union[xarray.DataArray, np.ndarray],
    extrapolation_value: float = 0.0,
):
    obj = self.__class__(
        interpolate_dataset_along_axis(
            new_frequencies, self.dataset, coordinate_name="frequency"
        )
    )
    obj.fillna(extrapolation_value)
    return obj
def is_invalid(self) ‑> xarray.core.dataarray.DataArray
Expand source code
def is_invalid(self) -> xarray.DataArray:
    return self.variance_density.isnull().all(dim=self.dims_spectral)
def is_valid(self) ‑> xarray.core.dataarray.DataArray
Expand source code
def is_valid(self) -> xarray.DataArray:
    return ~self.is_invalid()
def m0(self, fmin=0, fmax=inf) ‑> xarray.core.dataarray.DataArray

Zero order frequency moment. Also referred to as variance or energy.

:param fmin: minimum frequency :param fmax: maximum frequency :return: variance/energy

Expand source code
def m0(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
    """
    Zero order frequency moment. Also referred to as variance or energy.

    :param fmin: minimum frequency
    :param fmax: maximum frequency
    :return: variance/energy
    """
    return self.frequency_moment(0, fmin, fmax)
def m1(self, fmin=0, fmax=inf) ‑> xarray.core.dataarray.DataArray

First order frequency moment. Primarily used in calculating a mean period measure (Tm01)

:param fmin: minimum frequency :param fmax: maximum frequency :return: first order frequency moment.

Expand source code
def m1(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
    """
    First order frequency moment. Primarily used in calculating a mean
    period measure (Tm01)

    :param fmin: minimum frequency
    :param fmax: maximum frequency
    :return: first order frequency moment.
    """
    return self.frequency_moment(1, fmin, fmax)
def m2(self, fmin=0, fmax=inf) ‑> xarray.core.dataarray.DataArray

Second order frequency moment. Primarily used in calculating the zero crossing period (Tm02)

:param fmin: minimum frequency :param fmax: maximum frequency :return: Second order frequency moment.

Expand source code
def m2(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
    """
    Second order frequency moment. Primarily used in calculating the zero
    crossing period (Tm02)

    :param fmin: minimum frequency
    :param fmax: maximum frequency
    :return: Second order frequency moment.
    """
    return self.frequency_moment(2, fmin, fmax)
def mean(self, dim, skipna=False)

Calculate the mean value of the spectrum along the given dimension. :param dim: dimension to average over :param skipna: whether or not to "skip" nan values; if True behaves as np.nanmean :return:

Expand source code
def mean(self, dim, skipna=False):
    """
    Calculate the mean value of the spectrum along the given dimension.
    :param dim: dimension to average over
    :param skipna: whether or not to "skip" nan values; if
        True behaves as np.nanmean
    :return:
    """
    if dim in SPECTRAL_DIMS:
        raise ValueError("Cannot calculate mean over spectral dimensions")

    cls = type(self)
    dataset = xarray.Dataset()
    # Todo: fix averaging over longitude for (prime/anti) meridian issues
    dataset = dataset.assign({dim: self.dataset[dim].mean(dim=dim, skipna=skipna)})
    for x in self.dataset:
        dataset = dataset.assign({x: self.dataset[x].mean(dim=dim, skipna=skipna)})
    return cls(dataset)
def mean_a1(self, fmin=0, fmax=inf)
Expand source code
def mean_a1(self, fmin=0, fmax=np.inf):
    return self._spectral_weighted(self.a1, fmin, fmax)
def mean_a2(self, fmin=0, fmax=inf)
Expand source code
def mean_a2(self, fmin=0, fmax=np.inf):
    return self._spectral_weighted(self.a2, fmin, fmax)
def mean_b1(self, fmin=0, fmax=inf)
Expand source code
def mean_b1(self, fmin=0, fmax=np.inf):
    return self._spectral_weighted(self.b1, fmin, fmax)
def mean_b2(self, fmin=0, fmax=inf)
Expand source code
def mean_b2(self, fmin=0, fmax=np.inf):
    return self._spectral_weighted(self.b2, fmin, fmax)
def mean_direction(self, fmin=0, fmax=inf)
Expand source code
def mean_direction(self, fmin=0, fmax=np.inf):
    return self._mean_direction(self.mean_a1(fmin, fmax), self.mean_b1(fmin, fmax))
def mean_directional_spread(self, fmin=0, fmax=inf)
Expand source code
def mean_directional_spread(self, fmin=0, fmax=np.inf):
    return self._spread(self.mean_a1(fmin, fmax), self.mean_b1(fmin, fmax))
def mean_squared_slope(self, fmin=0, fmax=inf) ‑> xarray.core.dataarray.DataArray
Expand source code
def mean_squared_slope(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
    _range = self._range(fmin, fmax)

    # Integrate dataset over frequencies. Make sure to fill any NaN entries with
    # 0 before the integration.
    return (
        self.slope_spectrum.fillna(0).isel({NAME_F: _range}).integrate(coord=NAME_F)
    )
def multiply(self, array: numpy.ndarray, dimensions: Optional[List[str]] = None, inplace: bool = False)

Multiply the variance density with the given np array. Broadcasting is performed automatically if dimensions are provided. If no dimensions are provided the array needs to have the exact same shape as the variance density array.

:param array: Array to multiply with variance density :param dimension: Dimensions of the array :return: self

Expand source code
def multiply(
    self,
    array: np.ndarray,
    dimensions: Optional[List[str]] = None,
    inplace: bool = False,
):
    """
    Multiply the variance density with the given np array. Broadcasting is
    performed automatically if dimensions are provided. If no dimensions are
    provided the array needs to have the exact same shape as the variance
    density array.

    :param array: Array to multiply with variance density
    :param dimension: Dimensions of the array
    :return: self
    """
    if inplace:
        output = self
    else:
        output = self.copy()

    coords = {}
    shape = array.shape
    if dimensions is None:
        if shape != self.shape():
            raise ValueError(
                "If no dimensions are provided the array must have the exact same"
                "shape as the variance density array."
            )

        output.dataset[NAME_E] = self.dataset[NAME_E] * array
        return output

    if len(shape) != len(dimensions):
        raise ValueError(
            "The dimensions of the input array must match the number of"
            "dimension labels"
        )

    for length, dimension in zip(shape, dimensions):
        if dimension not in self.dims:
            raise ValueError(
                f"Dimension {dimension} not a valid dimension of the"
                "spectral object."
            )
        coords[dimension] = self.dataset[dimension].values

        if len(self.dataset[dimension].values) != length:
            raise ValueError(
                f"Array length along the dimension {dimension} does not match the"
                " length of the coordinate of the same name in the spctral object."
            )

    data = xarray.DataArray(data=array, coords=coords, dims=dimensions)
    output.dataset[NAME_E] = self.dataset[NAME_E] * data
    return output
def peak_angular_frequency(self, fmin=0, fmax=inf) ‑> xarray.core.dataarray.DataArray

Peak frequency of the spectrum, i.e. frequency at which the spectrum obtains its maximum.

:param fmin: minimum frequency :param fmax: maximum frequency :return: peak frequency

Expand source code
def peak_angular_frequency(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
    """
    Peak frequency of the spectrum, i.e. frequency at which the spectrum
    obtains its maximum.

    :param fmin: minimum frequency
    :param fmax: maximum frequency
    :return: peak frequency
    """
    return self.peak_frequency(fmin, fmax) * np.pi * 2
def peak_direction(self, fmin=0, fmax=inf) ‑> xarray.core.dataarray.DataArray
Expand source code
def peak_direction(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
    index = self.peak_index(fmin, fmax)
    return self._mean_direction(
        self.a1.isel(**{NAME_F: index}), self.b1.isel(**{NAME_F: index})
    )
def peak_directional_spread(self, fmin=0, fmax=inf) ‑> xarray.core.dataarray.DataArray
Expand source code
def peak_directional_spread(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
    index = self.peak_index(fmin, fmax)
    a1 = self.a1.isel(**{NAME_F: index})
    b1 = self.b1.isel(**{NAME_F: index})
    return self._spread(a1, b1)
def peak_frequency(self, fmin=0.0, fmax=inf, use_spline=False, **kwargs) ‑> xarray.core.dataarray.DataArray

Peak frequency of the spectrum, i.e. frequency at which the spectrum obtains its maximum.

:param fmin: minimum frequency :param fmax: maximum frequency :param use_spline: Use a spline based interpolation and determine peak frequency from the spline. This allows for a continuous estimate of the peak frequency. WARNING: if True the fmin and fmax paramteres are IGNORED :return: peak frequency

Expand source code
def peak_frequency(
    self, fmin=0.0, fmax=np.inf, use_spline=False, **kwargs
) -> xarray.DataArray:
    """
    Peak frequency of the spectrum, i.e. frequency at which the spectrum
    obtains its maximum.

    :param fmin: minimum frequency
    :param fmax: maximum frequency
    :param use_spline: Use a spline based interpolation and determine peak
        frequency from the spline. This
    allows for a continuous estimate of the peak frequency. WARNING: if True the
    fmin and fmax paramteres are IGNORED
    :return: peak frequency
    """
    if use_spline:
        if not fmin == 0.0 or np.isfinite(fmax):
            warn(
                "The fmin and fmax parameters are ignored"
                "if use_spline is set to True"
            )

        data = spline_peak_frequency(self.frequency.values, self.e.values, **kwargs)
        if len(self.dims_space_time) == 0:
            data = data[0]

        return xarray.DataArray(
            data=data,
            coords=self.coords_space_time,
            dims=self.dims_space_time,
        )
    else:
        return self.dataset[NAME_F][self.peak_index(fmin, fmax)]
def peak_index(self, fmin: float = 0, fmax: float = inf) ‑> xarray.core.dataarray.DataArray

Index of the peak frequency of the 1d spectrum within the given range :param fmin: minimum frequency :param fmax: maximum frequency :return: peak indices

Expand source code
def peak_index(self, fmin: float = 0, fmax: float = np.inf) -> xarray.DataArray:
    """
    Index of the peak frequency of the 1d spectrum within the given range
    :param fmin: minimum frequency
    :param fmax: maximum frequency
    :return: peak indices
    """
    return self.e.where(self._range(fmin, fmax), 0).argmax(dim=NAME_F)
def peak_period(self, fmin=0, fmax=inf, use_spline=False, **kwargs) ‑> xarray.core.dataarray.DataArray

Peak period of the spectrum, i.e. period at which the spectrum obtains its maximum.

:param fmin: minimum frequency :param fmax: maximum frequency :return: peak period

Expand source code
def peak_period(
    self, fmin=0, fmax=np.inf, use_spline=False, **kwargs
) -> xarray.DataArray:
    """
    Peak period of the spectrum, i.e. period at which the spectrum
    obtains its maximum.

    :param fmin: minimum frequency
    :param fmax: maximum frequency
    :return: peak period
    """
    peak_period = 1 / self.peak_frequency(
        fmin, fmax, use_spline=use_spline, **kwargs
    )
    peak_period.name = "peak period"
    try:
        peak_period = peak_period.drop("frequency")
    except Exception:
        pass
    return peak_period
def peak_wave_speed(self) ‑> xarray.core.dataarray.DataArray
Expand source code
def peak_wave_speed(self) -> xarray.DataArray:
    return 2 * np.pi * self.peak_frequency() / self.peak_wavenumber
def save_as_netcdf(self, path)
Expand source code
def save_as_netcdf(self, path):
    self.dataset.to_netcdf(path)
def shape(self)
Expand source code
def shape(self):
    return self.variance_density.shape
def space_time_shape(self)
Expand source code
def space_time_shape(self):
    number_of_spectral_dims = len(self.dims_spectral)
    return self.shape()[:-number_of_spectral_dims]
def spectral_shape(self)
Expand source code
def spectral_shape(self):
    number_of_spectral_dims = len(self.dims_spectral)
    return self.shape()[-number_of_spectral_dims:]
def std(self, dim: str, skipna: bool = False)

Calculate the standard deviation of the spectrum along the given dimension. :param dim: dimension to calculate standard deviation over :param skipna: whether or not to "skip" nan values; if True behaves as np.nanstd :return:

Expand source code
def std(self, dim: str, skipna: bool = False):
    """
    Calculate the standard deviation of the spectrum along the given dimension.
    :param dim: dimension to calculate standard deviation over
    :param skipna: whether or not to "skip" nan values; if True behaves as np.nanstd
    :return:
    """
    if dim in SPECTRAL_DIMS:
        raise ValueError(
            "Cannot calculate standard deviation over spectral dimensions"
        )

    cls = type(self)
    dataset = xarray.Dataset()
    # we assign the average coordinate to the dimension we calculate the std over
    dataset = dataset.assign({dim: self.dataset[dim].mean(dim=dim, skipna=skipna)})
    for x in self.dataset:
        dataset = dataset.assign({x: self.dataset[x].std(dim=dim, skipna=skipna)})
    return cls(dataset)
def sum(self, dim: str, skipna: bool = False)

Calculate the sum value of the spectrum along the given dimension. :param dim: dimension to sum over :param skipna: whether or not to "skip" nan values; if True behaves as np.nansum :return:

Expand source code
def sum(self, dim: str, skipna: bool = False):
    """
    Calculate the sum value of the spectrum along the given dimension.
    :param dim: dimension to sum over
    :param skipna: whether or not to "skip" nan values; if True behaves as np.nansum
    :return:
    """

    if dim in SPECTRAL_DIMS:
        raise ValueError("Cannot calculate sum over spectral dimensions")

    cls = type(self)
    dataset = xarray.Dataset()
    # we assign the average coordinate to the dimension we sum over
    dataset = dataset.assign({dim: self.dataset[dim].mean(dim=dim, skipna=skipna)})
    for x in self.dataset:
        dataset = dataset.assign({x: self.dataset[x].sum(dim=dim, skipna=skipna)})
    return cls(dataset)
def tm01(self, fmin=0, fmax=inf) ‑> xarray.core.dataarray.DataArray

Mean period, estimated as the inverse of the center of mass of the spectral curve under the 1d spectrum.

:param fmin: minimum frequency :param fmax: maximum frequency :return: Mean period

Expand source code
def tm01(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
    """
    Mean period, estimated as the inverse of the center of mass of the
    spectral curve under the 1d spectrum.

    :param fmin: minimum frequency
    :param fmax: maximum frequency
    :return: Mean period
    """
    return self.m0(fmin, fmax) / self.m1(fmin, fmax)
def tm02(self, fmin=0, fmax=inf) ‑> xarray.core.dataarray.DataArray

Zero crossing period based on Rice's spectral estimate.

:param fmin: minimum frequency :param fmax: maximum frequency :return: Zero crossing period

Expand source code
def tm02(self, fmin=0, fmax=np.inf) -> xarray.DataArray:
    """
    Zero crossing period based on Rice's spectral estimate.

    :param fmin: minimum frequency
    :param fmax: maximum frequency
    :return: Zero crossing period
    """
    return xarray.DataArray(np.sqrt(self.m0(fmin, fmax) / self.m2(fmin, fmax)))
def wave_age(self, windspeed)
Expand source code
def wave_age(self, windspeed):
    return self.peak_wave_speed() / windspeed
def wave_speed(self) ‑> xarray.core.dataarray.DataArray

:return:

Expand source code
def wave_speed(self) -> xarray.DataArray:
    """
    :return:
    """
    # Note we multiply inverse wavenumber with frequency to force xarray to return
    # a number_of_points by by number of frequencies data structure.
    return (1 / self.wavenumber) * self.radian_frequency
def where(self, condition: xarray.core.dataarray.DataArray)
Expand source code
def where(self, condition: xarray.DataArray):
    return self._apply_filter(condition)