Expand source code
import numpy as np
from ocean_science_utilities.wavespectra.estimators.utils import (
get_direction_increment,
get_rhs,
get_constraint_matrix,
)
try:
from qpsolvers import solve_ls # type: ignore
except ImportError:
pass
def log_likelyhood(
directions_radians: np.ndarray,
a1: np.ndarray,
b1: np.ndarray,
a2: np.ndarray,
b2: np.ndarray,
progress,
**kwargs
) -> np.ndarray:
number_of_frequencies = a1.shape[-1]
number_of_points = a1.shape[0]
directional_distribution = np.zeros(
(number_of_points, number_of_frequencies, len(directions_radians))
)
for ipoint in range(0, number_of_points):
progress.update(1)
directional_distribution[ipoint, :, :] = _log_likelyhood(
directions_radians,
a1[
ipoint,
:,
],
b1[ipoint, :],
a2[ipoint, :],
b2[ipoint, :],
)
return directional_distribution
def _log_likelyhood(directions_radians: np.ndarray, a1, b1, a2, b2) -> np.ndarray:
"""
Return the directional distribution that minimizes the variance (D**2)
constrained by given observed directional moments,
:param directions_radians: 1d array of wave directions in radians,
length[number_of_directions]
:param a1: 1d array of cosine directional moment as function of frequency,
length [number_of_frequencies]
:param b1: 1d array of sine directional moment as function of frequency,
length [number_of_frequencies]
:param a2: 1d array of double angle cosine directional moment as function
of frequency, length [number_of_frequencies]
:param b2: 1d array of double angle sine directional moment as function of
frequency, length [number_of_frequencies]
:return: array with shape [number_of_frequencies,number_of_direction]
representing the directional distribution of the waves at each frequency.
Minimize the variance of the solution:
integrate D**2 over directions
such that the resulting distribution D reproduces the observed moments.
Implementation notes:
- we formulate the problem as a standard Quadratic Programming problem which
can them be solved efficiently with the qpsolvers package.
"""
a1 = np.atleast_1d(a1)
b1 = np.atleast_1d(b1)
a2 = np.atleast_1d(a2)
b2 = np.atleast_1d(b2)
number_of_frequencies = len(a1)
directional_distribution = np.zeros(
(number_of_frequencies, len(directions_radians))
)
constraint_matrix = get_constraint_matrix(directions_radians)
rhs = get_rhs(a1, b1, a2, b2)
identity_matrix = np.diag(np.ones_like(directions_radians), 0)
zeros = np.zeros_like(directions_radians)
direction_increment = get_direction_increment(directions_radians)
upperbound = np.ones_like(directions_radians) / direction_increment.min()
for ifreq in range(0, number_of_frequencies):
res = solve_ls(
R=identity_matrix,
s=zeros,
# minimizing |Rx-b|**2
lb=zeros,
ub=upperbound,
# lb: non-negative; ub: binwidth * ub = 1
A=constraint_matrix,
b=rhs[ifreq, :],
verbose=False
# with hard constraint that Ax=b
)
if res is None:
raise Exception("No solution")
directional_distribution[ifreq, :] = res
directional_distribution[directional_distribution < 0] = 0
return np.squeeze(directional_distribution)