Module ocean_science_utilities.wavespectra.estimators.mem2
Implementation of the "MEM2" method:
see Kim1995:
Kim, T., Lin, L. H., & Wang, H. (1995). Application of maximum entropy method
to the real sea data. In Coastal Engineering 1994 (pp. 340-355).
link: <https://icce-ojs-tamu.tdl.org/icce/index.php/icce/article/download/4967/4647>
(working as of May 29, 2022)
and references therein.
Expand source code
"""
Implementation of the "MEM2" method:
see Kim1995:
Kim, T., Lin, L. H., & Wang, H. (1995). Application of maximum entropy method
to the real sea data. In Coastal Engineering 1994 (pp. 340-355).
link: https://icce-ojs-tamu.tdl.org/icce/index.php/icce/article/download/4967/4647
(working as of May 29, 2022)
and references therein.
"""
import numpy as np
import numba # type: ignore
import scipy # type: ignore
from numba_progress import ProgressBar # type: ignore
from ocean_science_utilities.wavespectra.estimators.mem import numba_mem
from ocean_science_utilities.wavespectra.estimators.utils import get_direction_increment
# Settings for numba JIT compilation- whether to use fast math and parallel
# optimizations when possible.
_FASTMATH = True
_PARALLEL = False
# Numerical settings used in solving for the mem2 distribution
NUMERICS = {
# absolute tolerence stopping criterium. let moment = [ a1,b1,a2,b2] and let
# iterate_moment contain the moments calculated from the current estmitated
# distribution.
# The stopping criterium is:
# np.linalg.norm( moment-iterate_moment ) < atol
"atol": 0.01,
# Maximum number of iterations
"max_iter": 100,
# Maximum number of subiterations in the line search algorithm. Typically deep line
# search activates only when the convergence is poor anyway.
"max_line_search_depth": 8,
# If we fall back to least squares estimate of the newton update we have an
# ill-conditioned system, and solve the system approximately removing the
# smallest singular values. rcond it the ration of smallest divided by largest
# singular value.
"rcond": 1e-6,
# Convergence is mostly (based on limited testing) poor for narrow distributions
# (large lagrange multipliers). If
# we fail to converge we fall back to the mem estimate which has no such issues.
# For narrow distributions this is
# hopefully fine.
"use_mem_when_failing_to_converge": True,
}
# Entry Function
# =============================================================================
def mem2(
directions_radians: np.ndarray,
a1: np.ndarray,
b1: np.ndarray,
a2: np.ndarray,
b2: np.ndarray,
progress_bar: ProgressBar = None,
solution_method="newton",
solver_config=None,
) -> np.ndarray:
"""
:param directions_radians:
:param a1:
:param b1:
:param a2:
:param b2:
:param solution_method:
:return:
"""
if solver_config is None:
solver_config = NUMERICS
else:
solver_config = NUMERICS | solver_config
if solution_method == "scipy":
func = mem2_scipy_root_finder
kwargs = {}
elif solution_method == "newton":
func = mem2_newton
numba_solver_config = numba.typed.Dict.empty(
key_type=numba.core.types.unicode_type, value_type=numba.core.types.float64
)
for key in solver_config:
numba_solver_config[key] = solver_config[key]
kwargs = {"config": numba_solver_config}
elif solution_method == "approximate":
func = mem2_newton
kwargs = {"approximate": True}
else:
raise ValueError("Unknown method")
return func(directions_radians, a1, b1, a2, b2, progress_bar, **kwargs)
# Scipy Implementation
# =============================================================================
def mem2_scipy_root_finder(
directions_radians: np.ndarray,
a1: np.ndarray,
b1: np.ndarray,
a2: np.ndarray,
b2: np.ndarray,
progress,
**kwargs
) -> np.ndarray:
"""
Return the directional distribution that maximizes Shannon [ - D log(D) ]
enthrophy constrained by given observed directional moments,
:param directions_radians: 1d array of wave directions in radians,
length[number_of_directions]
:param a1: 1d array of cosine directional moment as function of frequency,
length [number_of_frequencies]
:param b1: 1d array of sine directional moment as function of frequency,
length [number_of_frequencies]
:param a2: 1d array of double angle cosine directional moment as function
of frequency, length [number_of_frequencies]
:param b2: 1d array of double angle sine directional moment as function of
frequency, length [number_of_frequencies]
:return: array with shape [number_of_frequencies,number_of_direction]
representing the directional distribution of the waves at each frequency.
Maximize the enthrophy of the solution with entrophy defined as:
integrate - D * log(D) over directions
such that the resulting distribution D reproduces the observed moments.
"""
number_of_frequencies = a1.shape[-1]
number_of_points = a1.shape[0]
directional_distribution = np.zeros(
(number_of_points, number_of_frequencies, len(directions_radians))
)
direction_increment = get_direction_increment(directions_radians)
twiddle_factors = np.empty((4, len(directions_radians)))
twiddle_factors[0, :] = np.cos(directions_radians)
twiddle_factors[1, :] = np.sin(directions_radians)
twiddle_factors[2, :] = np.cos(2 * directions_radians)
twiddle_factors[3, :] = np.sin(2 * directions_radians)
guess = initial_value(a1, b1, a2, b2)
for ipoint in range(0, number_of_points):
progress.update(1)
for ifreq in range(0, number_of_frequencies):
#
moments = np.array(
[
a1[ipoint, ifreq],
b1[ipoint, ifreq],
a2[ipoint, ifreq],
b2[ipoint, ifreq],
]
)
if np.any(np.isnan(guess[ipoint, ifreq, :])):
continue
res = scipy.optimize.root(
moment_constraints,
guess[ipoint, ifreq, :],
args=(twiddle_factors, moments, direction_increment),
method="lm",
)
lambas = res.x
directional_distribution[ipoint, ifreq, :] = mem2_directional_distribution(
lambas, direction_increment, twiddle_factors
)
return directional_distribution
# Numba Implementation
# =============================================================================
# spatial iteration
# ---------------------
# To note; enabling caching seems to not play nice with paralel
@numba.njit(parallel=_PARALLEL, cache=(not _PARALLEL))
def mem2_newton(
directions_radians: np.ndarray,
a1: np.ndarray,
b1: np.ndarray,
a2: np.ndarray,
b2: np.ndarray,
progress_bar: ProgressBar = None,
config: numba.typed.Dict = None,
approximate: bool = False,
) -> np.ndarray:
"""
Return the directional distribution that maximizes Shannon [ - D log(D) ]
enthrophy constrained by given observed directional moments.
:param directions_radians: 1d array of wave directions in radians,
length[number_of_directions]
:param a1: 1d array of cosine directional moment as function of
position and frequency,
shape = ( number_of_points,number_of_frequencies)
:param b1: 1d array of sine directional moment as function of
position and frequency,
shape = ( number_of_points,number_of_frequencies)
:param a2: 1d array of double angle cosine directional moment as function of
position and frequency,
shape = ( number_of_points,number_of_frequencies)
:param b2: 1d array of double angle sine directional moment as function of
position and frequency,
shape = ( number_of_points,number_of_frequencies)
:param progress_bar: Progress bar instance if updates are desired.
:return: array with shape [
numbrt_of_points,
number_of_frequencies,
number_of_direction
]
representing the directional distribution of the waves at each frequency.
Maximize the enthrophy of the solution with entrophy defined as:
integrate - D * log(D) over directions
such that the resulting distribution D reproduces the observed moments.
"""
number_of_frequencies = a1.shape[-1]
number_of_points = a1.shape[0]
directional_distribution = np.zeros(
(number_of_points, number_of_frequencies, len(directions_radians))
)
direction_increment_downward_difference = (
directions_radians - np.roll(directions_radians, 1) + np.pi
) % (2 * np.pi) - np.pi
direction_increment_upward_difference = (
-(directions_radians - np.roll(directions_radians, -1) + np.pi) % (2 * np.pi)
- np.pi
)
direction_increment = (
direction_increment_downward_difference + direction_increment_upward_difference
) / 2
# Calculate the needed Fourier transform twiddle factors to calculate moments.
twiddle_factors = np.empty((4, len(directions_radians)))
twiddle_factors[0, :] = np.cos(directions_radians)
twiddle_factors[1, :] = np.sin(directions_radians)
twiddle_factors[2, :] = np.cos(2 * directions_radians)
twiddle_factors[3, :] = np.sin(2 * directions_radians)
guess = initial_value(a1, b1, a2, b2)
for ipoint in numba.prange(0, number_of_points):
if progress_bar is not None:
progress_bar.update(1)
# Note; entries to directional_distribution[ipoint, :, :] is modified in the
# call below. This avoids creation of memory for the resulting array at the
# expense of allowing for side-effects.
_mem2_newton_point(
directional_distribution[ipoint, :, :],
a1[ipoint, :],
b1[ipoint, :],
a2[ipoint, :],
b2[ipoint, :],
guess[ipoint, :, :],
direction_increment,
twiddle_factors,
config,
approximate,
)
return directional_distribution
# frequency iteration
# ----------------------
@numba.njit(cache=True)
def _mem2_newton_point(
out,
a1,
b1,
a2,
b2,
guess,
direction_increment,
twiddle_factors,
config=None,
approximate=False,
):
"""
:param out: a (view) of the array that will containt the output
:param a1: 1d array of cosine directional moment as function of frequency,
:param b1: 1d array of sine directional moment as function of frequency,
:param a2: 1d array of double angle cosine directional moment
as function of frequency,
:param b2: 1d array of double angle sine directional moment
as function of frequency,
:param guess: initial guess of the lagrange multipliers
:param direction_increment: directional stepsize used in the integration, nd-array
:param twiddle_factors: [sin theta, cost theta, sin 2*theta, cos 2*theta]
as a 4 by ndir array
:param config: numerical settings, see description at NUMERICS at top of file.
:param approximate: whether or not to use the approximate relations.
:return: None - we use side-effects to pass the results back to the
caller (modifying out)
"""
number_of_frequencies = a1.shape[0]
for ifreq in range(0, number_of_frequencies):
#
moments = np.array([a1[ifreq], b1[ifreq], a2[ifreq], b2[ifreq]])
out[ifreq, :] = mem2_newton_solver(
moments,
guess[ifreq, :],
direction_increment,
twiddle_factors,
config,
approximate,
)
# mem2 numerical solver
# ----------------------
@numba.njit(cache=True)
def mem2_newton_solver(
moments: np.ndarray,
guess: np.ndarray,
direction_increment: np.ndarray,
twiddle_factors: np.ndarray,
config=None,
approximate=False,
) -> np.ndarray:
"""
Newton iteration to find the solution to the non-linear system of constraint
equations defining the lagrange multipliers in the MEM2 method. Because the
Lagrange multipliers enter the equations as exponents the system can
be unstable to solve numerically.
:param moments: the normalized directional moments [a1,b1,a2,b2]
:param guess: first guess for the lagrange multipliers (ndarray, length 4)
:param direction_increment: directional stepsize used in the integration, nd-array
:param twiddle_factors: [sin theta, cost theta, sin 2*theta, cos 2*theta] as a
4 by ndir array
:param config: numerical settings, see description at NUMERICS at top of file.
:param approximate: whether or not to use the approximate relations.
:return:
"""
if config is None:
max_iter = 100
rcond = 1e-6
atol = 0.01
max_line_search_depth = 8
use_mem_when_failing_to_converge = True
else:
max_iter = config["max_iter"]
rcond = config["rcond"]
atol = config["atol"]
max_line_search_depth = config["max_line_search_depth"]
use_mem_when_failing_to_converge = (
config["use_mem_when_failing_to_converge"] > 0.0
)
directional_distribution = np.empty(len(direction_increment))
if np.any(np.isnan(guess)):
directional_distribution[:] = 0
return directional_distribution
if approximate:
directional_distribution[:] = mem2_directional_distribution(
guess, direction_increment, twiddle_factors
)
return directional_distribution
current_iterate = guess
current_func = moment_constraints(
current_iterate,
twiddle_factors,
moments,
direction_increment,
)
jacobian = np.empty((4, 4))
convergence = False
for iter in range(0, max_iter):
# Stopping criterium
magnitude_cur_func_eval = np.linalg.norm(current_func)
if magnitude_cur_func_eval < atol:
convergence = True
break
#
# Compute jacobian, and find newton iterate innovation as we solve for:
#
# jacobian @ delta = - current_iterate_func_eval
#
# with:
#
# delta = next_lagrange_multiplier_iterate-cur_lagrange_multiplier_iterate
jacobian = mem2_jacobian(
current_iterate, twiddle_factors, direction_increment, jacobian
)
try:
update_iterate = solve_cholesky(jacobian, -current_func)
except Exception:
update_iterate = np.linalg.lstsq(jacobian, -current_func, rcond=rcond)[0]
magnitude_current_iterate = np.linalg.norm(current_iterate)
magnitude_update = np.linalg.norm(update_iterate)
# Do a line search for the optimum decrease. This is intended to stabilize the
# algorithm as the equations are ill-posed.
line_search_factor = 1
for ii in range(max_line_search_depth):
next_iterate = current_iterate + line_search_factor * update_iterate
next_func = moment_constraints(
next_iterate, twiddle_factors, moments, direction_increment
)
if np.linalg.norm(next_func) < magnitude_cur_func_eval:
# If we are decreasing- continue
current_func = next_func
current_iterate = next_iterate
break
else:
# The update may be too big as we are not decreasing the cost function
# magnitude. We will decrease the step size we take - but keep the
# direction of the step the same.
inverse_relative_update = magnitude_current_iterate / magnitude_update
line_search_factor = min(
inverse_relative_update, line_search_factor / 2
)
else:
# The linesearch failed. We could not find a factor that ensures the next
# function estimate is closer to 0.
convergence = False
break
else:
# We failed to converge after the maximum number of iterations.
convergence = False
if not convergence:
if use_mem_when_failing_to_converge:
directions = np.arctan2(twiddle_factors[1, :], twiddle_factors[0, :])
directional_distribution[:] = numba_mem(
directions, moments[0], moments[1], moments[2], moments[3]
)
else:
raise ValueError("we did not converge")
directional_distribution[:] = mem2_directional_distribution(
current_iterate, direction_increment, twiddle_factors
)
return directional_distribution
# mem2 functions
# ----------------------
@numba.njit(cache=True, fastmath=_FASTMATH)
def moment_constraints(lambdas, twiddle_factors, moments, direction_increment):
"""
Construct the nonlinear equations we need to solve for lambda. The constrainst are
the difference between the desired moments a1,b1,a2,b2 and the moment calculated
from the current distribution guess and for a perfect fit should be 0.
To note: we differ from Kim et al here who formulate the constraints using
unnormalized equations. Here we opt to use the normalized version as that
allows us to cast the error / or mismatch directly in terms of an error in the
moments.
:param lambdas: the lagrange multipliers
:param twiddle_factors: [sin theta, cost theta, sin 2*theta, cos 2*theta] as a 4
by ndir array
:param moments: [a1,b1,a2,b2]
:param direction_increment: directional stepsize used in the integration, nd-array
:return: array (length=4) with the difference between desired moments and those
calculated from the current approximate distribution
"""
# Get the current estimate of the directional distribution
dist = mem2_directional_distribution(lambdas, direction_increment, twiddle_factors)
out = np.zeros(4)
for mm in range(0, 4):
# note - the part after the "-" is just a discrete approximation of the
# Fourier sine/cosine amplitude (moment)
out[mm] = moments[mm] - np.sum(
(twiddle_factors[mm, :]) * dist * direction_increment
)
return out
@numba.njit(cache=True, fastmath=_FASTMATH)
def mem2_jacobian(lagrange_multiplier, twiddle_factors, direction_increment, jacobian):
"""
Calculate the jacobian of the constraint equations. The resulting jacobian is a
square and positive definite matrix
:param lambdas: the lagrange multipliers
:param twiddle_factors: [sin theta, cost theta, sin 2*theta, cos 2*theta] as a
4 by ndir array
:param direction_increment: directional stepsize used in the integration, nd-array
:return: a 4 by 4 matrix that is the Jacobian of the constraint equations.
"""
inner_product = np.zeros(twiddle_factors.shape[1])
for jj in range(0, 4):
inner_product = inner_product + lagrange_multiplier[jj] * twiddle_factors[jj, :]
# We subtract the minimum to ensure that the values in the exponent do not become
# too large. This amounts to multiplyig with a constant - which is fine since we
# normalize anyway. Effectively- this avoids overflow errors (or infinities)
# - at the expense of underflowing (which is less of an issue).
inner_product = inner_product - np.min(inner_product)
normalization = 1 / np.sum(np.exp(-inner_product) * direction_increment)
shape = np.exp(-inner_product)
normalization_derivative = np.zeros(4)
for mm in range(0, 4):
normalization_derivative[mm] = normalization * np.sum(
twiddle_factors[mm, :] * np.exp(-inner_product) * direction_increment
)
# To note- we have to multiply seperately to avoid potential
# underflow/overflow errors.
normalization_derivative = normalization_derivative * normalization
shape_derivative = np.zeros((4, twiddle_factors.shape[1]))
for mm in range(0, 4):
shape_derivative[mm, :] = -twiddle_factors[mm, :] * shape
for mm in range(0, 4):
# we make use of symmetry and only explicitly calculate up to the diagonal
for nn in range(0, mm + 1):
jacobian[mm, nn] = -np.sum(
twiddle_factors[mm, :]
* direction_increment
* (
normalization * shape_derivative[nn, :]
+ shape * normalization_derivative[nn]
),
-1,
)
if nn != mm:
jacobian[nn, mm] = jacobian[mm, nn]
return jacobian
@numba.njit(cache=True, fastmath=_FASTMATH)
def mem2_directional_distribution(
lagrange_multiplier, direction_increment, twiddle_factors
) -> np.ndarray:
"""
Given the solution for the Lagrange multipliers- reconstruct the directional
distribution.
:param lagrange_multiplier: the lagrange multipliers
:param twiddle_factors: [sin theta, cost theta, sin 2*theta, cos 2*theta] as a 4 by
ndir array
:param direction_increment: directional stepsize used in the integration, nd-array
:return: Directional distribution arrasy as a function of directions
"""
inner_product = np.zeros(twiddle_factors.shape[1])
for jj in range(0, 4):
inner_product = inner_product + lagrange_multiplier[jj] * twiddle_factors[jj, :]
inner_product = inner_product - np.min(inner_product)
normalization = 1 / np.sum(np.exp(-inner_product) * direction_increment)
return np.exp(-inner_product) * normalization
@numba.njit(cache=True, fastmath=_FASTMATH)
def initial_value(a1: np.ndarray, b1: np.ndarray, a2: np.ndarray, b2: np.ndarray):
"""
Initial guess of the Lagrange Multipliers according to the "MEM AP2" approximation
found im Kim1995
:param a1: moment a1
:param b1: moment b1
:param a2: moment a2
:param b2: moment b2
:return: initial guess of the lagrange multipliers,
with the same leading dimensions as input.
"""
guess = np.empty((*a1.shape, 4))
fac = 1 + a1**2 + b1**2 + a2**2 + b2**2
guess[..., 0] = 2 * a1 * a2 + 2 * b1 * b2 - 2 * a1 * fac
guess[..., 1] = 2 * a1 * b2 - 2 * b1 * a2 - 2 * b1 * fac
guess[..., 2] = a1**2 - b1**2 - 2 * a2 * fac
guess[..., 3] = 2 * a1 * b1 - 2 * b2 * fac
return guess
@numba.njit(cache=True, fastmath=True)
def solve_cholesky(matrix, rhs):
"""
Solve using cholesky decomposition according to the Cholesky–Banachiewicz algorithm.
See: https://en.wikipedia.org/wiki/Cholesky_decomposition#The_Cholesky_algorithm
"""
M, N = matrix.shape
x = np.zeros(M)
cholesky_decomposition = np.zeros((M, M))
inv = np.zeros(M)
for mm in range(0, M):
forward_sub_sum = rhs[mm]
for nn in range(0, mm):
sum = matrix[mm, nn]
for kk in range(0, nn):
sum -= cholesky_decomposition[mm, kk] * cholesky_decomposition[nn, kk]
cholesky_decomposition[mm, nn] = inv[nn] * sum
forward_sub_sum += -cholesky_decomposition[mm, nn] * x[nn]
sum = matrix[mm, mm]
for kk in range(0, mm):
sum -= cholesky_decomposition[mm, kk] ** 2
if sum <= 0.0:
raise ValueError(
"Matrix not positive definite, likely due to finite precision errors."
)
cholesky_decomposition[mm, mm] = np.sqrt(sum)
inv[mm] = 1 / cholesky_decomposition[mm, mm]
x[mm] = forward_sub_sum * inv[mm]
# Backward Substitution (in place)
for mm in range(0, M):
kk = M - mm - 1
sum = x[kk]
for nn in range(kk + 1, N):
sum += -cholesky_decomposition[nn, kk] * x[nn]
x[kk] = sum * inv[kk]
return x
Functions
def initial_value(a1: numpy.ndarray, b1: numpy.ndarray, a2: numpy.ndarray, b2: numpy.ndarray)
-
Initial guess of the Lagrange Multipliers according to the "MEM AP2" approximation found im Kim1995
:param a1: moment a1 :param b1: moment b1 :param a2: moment a2 :param b2: moment b2 :return: initial guess of the lagrange multipliers, with the same leading dimensions as input.
Expand source code
@numba.njit(cache=True, fastmath=_FASTMATH) def initial_value(a1: np.ndarray, b1: np.ndarray, a2: np.ndarray, b2: np.ndarray): """ Initial guess of the Lagrange Multipliers according to the "MEM AP2" approximation found im Kim1995 :param a1: moment a1 :param b1: moment b1 :param a2: moment a2 :param b2: moment b2 :return: initial guess of the lagrange multipliers, with the same leading dimensions as input. """ guess = np.empty((*a1.shape, 4)) fac = 1 + a1**2 + b1**2 + a2**2 + b2**2 guess[..., 0] = 2 * a1 * a2 + 2 * b1 * b2 - 2 * a1 * fac guess[..., 1] = 2 * a1 * b2 - 2 * b1 * a2 - 2 * b1 * fac guess[..., 2] = a1**2 - b1**2 - 2 * a2 * fac guess[..., 3] = 2 * a1 * b1 - 2 * b2 * fac return guess
def mem2(directions_radians: numpy.ndarray, a1: numpy.ndarray, b1: numpy.ndarray, a2: numpy.ndarray, b2: numpy.ndarray, progress_bar: numba_progress.progress.ProgressBar = None, solution_method='newton', solver_config=None) ‑> numpy.ndarray
-
:param directions_radians: :param a1: :param b1: :param a2: :param b2: :param solution_method: :return:
Expand source code
def mem2( directions_radians: np.ndarray, a1: np.ndarray, b1: np.ndarray, a2: np.ndarray, b2: np.ndarray, progress_bar: ProgressBar = None, solution_method="newton", solver_config=None, ) -> np.ndarray: """ :param directions_radians: :param a1: :param b1: :param a2: :param b2: :param solution_method: :return: """ if solver_config is None: solver_config = NUMERICS else: solver_config = NUMERICS | solver_config if solution_method == "scipy": func = mem2_scipy_root_finder kwargs = {} elif solution_method == "newton": func = mem2_newton numba_solver_config = numba.typed.Dict.empty( key_type=numba.core.types.unicode_type, value_type=numba.core.types.float64 ) for key in solver_config: numba_solver_config[key] = solver_config[key] kwargs = {"config": numba_solver_config} elif solution_method == "approximate": func = mem2_newton kwargs = {"approximate": True} else: raise ValueError("Unknown method") return func(directions_radians, a1, b1, a2, b2, progress_bar, **kwargs)
def mem2_directional_distribution(lagrange_multiplier, direction_increment, twiddle_factors) ‑> numpy.ndarray
-
Given the solution for the Lagrange multipliers- reconstruct the directional distribution. :param lagrange_multiplier: the lagrange multipliers :param twiddle_factors: [sin theta, cost theta, sin 2theta, cos 2theta] as a 4 by ndir array :param direction_increment: directional stepsize used in the integration, nd-array :return: Directional distribution arrasy as a function of directions
Expand source code
@numba.njit(cache=True, fastmath=_FASTMATH) def mem2_directional_distribution( lagrange_multiplier, direction_increment, twiddle_factors ) -> np.ndarray: """ Given the solution for the Lagrange multipliers- reconstruct the directional distribution. :param lagrange_multiplier: the lagrange multipliers :param twiddle_factors: [sin theta, cost theta, sin 2*theta, cos 2*theta] as a 4 by ndir array :param direction_increment: directional stepsize used in the integration, nd-array :return: Directional distribution arrasy as a function of directions """ inner_product = np.zeros(twiddle_factors.shape[1]) for jj in range(0, 4): inner_product = inner_product + lagrange_multiplier[jj] * twiddle_factors[jj, :] inner_product = inner_product - np.min(inner_product) normalization = 1 / np.sum(np.exp(-inner_product) * direction_increment) return np.exp(-inner_product) * normalization
def mem2_jacobian(lagrange_multiplier, twiddle_factors, direction_increment, jacobian)
-
Calculate the jacobian of the constraint equations. The resulting jacobian is a square and positive definite matrix
:param lambdas: the lagrange multipliers :param twiddle_factors: [sin theta, cost theta, sin 2theta, cos 2theta] as a 4 by ndir array :param direction_increment: directional stepsize used in the integration, nd-array
:return: a 4 by 4 matrix that is the Jacobian of the constraint equations.
Expand source code
@numba.njit(cache=True, fastmath=_FASTMATH) def mem2_jacobian(lagrange_multiplier, twiddle_factors, direction_increment, jacobian): """ Calculate the jacobian of the constraint equations. The resulting jacobian is a square and positive definite matrix :param lambdas: the lagrange multipliers :param twiddle_factors: [sin theta, cost theta, sin 2*theta, cos 2*theta] as a 4 by ndir array :param direction_increment: directional stepsize used in the integration, nd-array :return: a 4 by 4 matrix that is the Jacobian of the constraint equations. """ inner_product = np.zeros(twiddle_factors.shape[1]) for jj in range(0, 4): inner_product = inner_product + lagrange_multiplier[jj] * twiddle_factors[jj, :] # We subtract the minimum to ensure that the values in the exponent do not become # too large. This amounts to multiplyig with a constant - which is fine since we # normalize anyway. Effectively- this avoids overflow errors (or infinities) # - at the expense of underflowing (which is less of an issue). inner_product = inner_product - np.min(inner_product) normalization = 1 / np.sum(np.exp(-inner_product) * direction_increment) shape = np.exp(-inner_product) normalization_derivative = np.zeros(4) for mm in range(0, 4): normalization_derivative[mm] = normalization * np.sum( twiddle_factors[mm, :] * np.exp(-inner_product) * direction_increment ) # To note- we have to multiply seperately to avoid potential # underflow/overflow errors. normalization_derivative = normalization_derivative * normalization shape_derivative = np.zeros((4, twiddle_factors.shape[1])) for mm in range(0, 4): shape_derivative[mm, :] = -twiddle_factors[mm, :] * shape for mm in range(0, 4): # we make use of symmetry and only explicitly calculate up to the diagonal for nn in range(0, mm + 1): jacobian[mm, nn] = -np.sum( twiddle_factors[mm, :] * direction_increment * ( normalization * shape_derivative[nn, :] + shape * normalization_derivative[nn] ), -1, ) if nn != mm: jacobian[nn, mm] = jacobian[mm, nn] return jacobian
def mem2_newton(directions_radians: numpy.ndarray, a1: numpy.ndarray, b1: numpy.ndarray, a2: numpy.ndarray, b2: numpy.ndarray, progress_bar: numba_progress.progress.ProgressBar = None, config: numba.typed.typeddict.Dict = None, approximate: bool = False) ‑> numpy.ndarray
-
Return the directional distribution that maximizes Shannon [ - D log(D) ] enthrophy constrained by given observed directional moments.
:param directions_radians: 1d array of wave directions in radians, length[number_of_directions]
:param a1: 1d array of cosine directional moment as function of position and frequency, shape = ( number_of_points,number_of_frequencies)
:param b1: 1d array of sine directional moment as function of position and frequency, shape = ( number_of_points,number_of_frequencies)
:param a2: 1d array of double angle cosine directional moment as function of position and frequency, shape = ( number_of_points,number_of_frequencies)
:param b2: 1d array of double angle sine directional moment as function of position and frequency, shape = ( number_of_points,number_of_frequencies)
:param progress_bar: Progress bar instance if updates are desired.
:return: array with shape [ numbrt_of_points, number_of_frequencies, number_of_direction ] representing the directional distribution of the waves at each frequency.
Maximize the enthrophy of the solution with entrophy defined as:
integrate - D * log(D) over directions
such that the resulting distribution D reproduces the observed moments.
Expand source code
@numba.njit(parallel=_PARALLEL, cache=(not _PARALLEL)) def mem2_newton( directions_radians: np.ndarray, a1: np.ndarray, b1: np.ndarray, a2: np.ndarray, b2: np.ndarray, progress_bar: ProgressBar = None, config: numba.typed.Dict = None, approximate: bool = False, ) -> np.ndarray: """ Return the directional distribution that maximizes Shannon [ - D log(D) ] enthrophy constrained by given observed directional moments. :param directions_radians: 1d array of wave directions in radians, length[number_of_directions] :param a1: 1d array of cosine directional moment as function of position and frequency, shape = ( number_of_points,number_of_frequencies) :param b1: 1d array of sine directional moment as function of position and frequency, shape = ( number_of_points,number_of_frequencies) :param a2: 1d array of double angle cosine directional moment as function of position and frequency, shape = ( number_of_points,number_of_frequencies) :param b2: 1d array of double angle sine directional moment as function of position and frequency, shape = ( number_of_points,number_of_frequencies) :param progress_bar: Progress bar instance if updates are desired. :return: array with shape [ numbrt_of_points, number_of_frequencies, number_of_direction ] representing the directional distribution of the waves at each frequency. Maximize the enthrophy of the solution with entrophy defined as: integrate - D * log(D) over directions such that the resulting distribution D reproduces the observed moments. """ number_of_frequencies = a1.shape[-1] number_of_points = a1.shape[0] directional_distribution = np.zeros( (number_of_points, number_of_frequencies, len(directions_radians)) ) direction_increment_downward_difference = ( directions_radians - np.roll(directions_radians, 1) + np.pi ) % (2 * np.pi) - np.pi direction_increment_upward_difference = ( -(directions_radians - np.roll(directions_radians, -1) + np.pi) % (2 * np.pi) - np.pi ) direction_increment = ( direction_increment_downward_difference + direction_increment_upward_difference ) / 2 # Calculate the needed Fourier transform twiddle factors to calculate moments. twiddle_factors = np.empty((4, len(directions_radians))) twiddle_factors[0, :] = np.cos(directions_radians) twiddle_factors[1, :] = np.sin(directions_radians) twiddle_factors[2, :] = np.cos(2 * directions_radians) twiddle_factors[3, :] = np.sin(2 * directions_radians) guess = initial_value(a1, b1, a2, b2) for ipoint in numba.prange(0, number_of_points): if progress_bar is not None: progress_bar.update(1) # Note; entries to directional_distribution[ipoint, :, :] is modified in the # call below. This avoids creation of memory for the resulting array at the # expense of allowing for side-effects. _mem2_newton_point( directional_distribution[ipoint, :, :], a1[ipoint, :], b1[ipoint, :], a2[ipoint, :], b2[ipoint, :], guess[ipoint, :, :], direction_increment, twiddle_factors, config, approximate, ) return directional_distribution
def mem2_newton_solver(moments: numpy.ndarray, guess: numpy.ndarray, direction_increment: numpy.ndarray, twiddle_factors: numpy.ndarray, config=None, approximate=False) ‑> numpy.ndarray
-
Newton iteration to find the solution to the non-linear system of constraint equations defining the lagrange multipliers in the MEM2 method. Because the Lagrange multipliers enter the equations as exponents the system can be unstable to solve numerically.
:param moments: the normalized directional moments [a1,b1,a2,b2] :param guess: first guess for the lagrange multipliers (ndarray, length 4) :param direction_increment: directional stepsize used in the integration, nd-array :param twiddle_factors: [sin theta, cost theta, sin 2theta, cos 2theta] as a 4 by ndir array :param config: numerical settings, see description at NUMERICS at top of file. :param approximate: whether or not to use the approximate relations. :return:
Expand source code
@numba.njit(cache=True) def mem2_newton_solver( moments: np.ndarray, guess: np.ndarray, direction_increment: np.ndarray, twiddle_factors: np.ndarray, config=None, approximate=False, ) -> np.ndarray: """ Newton iteration to find the solution to the non-linear system of constraint equations defining the lagrange multipliers in the MEM2 method. Because the Lagrange multipliers enter the equations as exponents the system can be unstable to solve numerically. :param moments: the normalized directional moments [a1,b1,a2,b2] :param guess: first guess for the lagrange multipliers (ndarray, length 4) :param direction_increment: directional stepsize used in the integration, nd-array :param twiddle_factors: [sin theta, cost theta, sin 2*theta, cos 2*theta] as a 4 by ndir array :param config: numerical settings, see description at NUMERICS at top of file. :param approximate: whether or not to use the approximate relations. :return: """ if config is None: max_iter = 100 rcond = 1e-6 atol = 0.01 max_line_search_depth = 8 use_mem_when_failing_to_converge = True else: max_iter = config["max_iter"] rcond = config["rcond"] atol = config["atol"] max_line_search_depth = config["max_line_search_depth"] use_mem_when_failing_to_converge = ( config["use_mem_when_failing_to_converge"] > 0.0 ) directional_distribution = np.empty(len(direction_increment)) if np.any(np.isnan(guess)): directional_distribution[:] = 0 return directional_distribution if approximate: directional_distribution[:] = mem2_directional_distribution( guess, direction_increment, twiddle_factors ) return directional_distribution current_iterate = guess current_func = moment_constraints( current_iterate, twiddle_factors, moments, direction_increment, ) jacobian = np.empty((4, 4)) convergence = False for iter in range(0, max_iter): # Stopping criterium magnitude_cur_func_eval = np.linalg.norm(current_func) if magnitude_cur_func_eval < atol: convergence = True break # # Compute jacobian, and find newton iterate innovation as we solve for: # # jacobian @ delta = - current_iterate_func_eval # # with: # # delta = next_lagrange_multiplier_iterate-cur_lagrange_multiplier_iterate jacobian = mem2_jacobian( current_iterate, twiddle_factors, direction_increment, jacobian ) try: update_iterate = solve_cholesky(jacobian, -current_func) except Exception: update_iterate = np.linalg.lstsq(jacobian, -current_func, rcond=rcond)[0] magnitude_current_iterate = np.linalg.norm(current_iterate) magnitude_update = np.linalg.norm(update_iterate) # Do a line search for the optimum decrease. This is intended to stabilize the # algorithm as the equations are ill-posed. line_search_factor = 1 for ii in range(max_line_search_depth): next_iterate = current_iterate + line_search_factor * update_iterate next_func = moment_constraints( next_iterate, twiddle_factors, moments, direction_increment ) if np.linalg.norm(next_func) < magnitude_cur_func_eval: # If we are decreasing- continue current_func = next_func current_iterate = next_iterate break else: # The update may be too big as we are not decreasing the cost function # magnitude. We will decrease the step size we take - but keep the # direction of the step the same. inverse_relative_update = magnitude_current_iterate / magnitude_update line_search_factor = min( inverse_relative_update, line_search_factor / 2 ) else: # The linesearch failed. We could not find a factor that ensures the next # function estimate is closer to 0. convergence = False break else: # We failed to converge after the maximum number of iterations. convergence = False if not convergence: if use_mem_when_failing_to_converge: directions = np.arctan2(twiddle_factors[1, :], twiddle_factors[0, :]) directional_distribution[:] = numba_mem( directions, moments[0], moments[1], moments[2], moments[3] ) else: raise ValueError("we did not converge") directional_distribution[:] = mem2_directional_distribution( current_iterate, direction_increment, twiddle_factors ) return directional_distribution
def mem2_scipy_root_finder(directions_radians: numpy.ndarray, a1: numpy.ndarray, b1: numpy.ndarray, a2: numpy.ndarray, b2: numpy.ndarray, progress, **kwargs) ‑> numpy.ndarray
-
Return the directional distribution that maximizes Shannon [ - D log(D) ] enthrophy constrained by given observed directional moments,
:param directions_radians: 1d array of wave directions in radians, length[number_of_directions]
:param a1: 1d array of cosine directional moment as function of frequency, length [number_of_frequencies]
:param b1: 1d array of sine directional moment as function of frequency, length [number_of_frequencies]
:param a2: 1d array of double angle cosine directional moment as function of frequency, length [number_of_frequencies]
:param b2: 1d array of double angle sine directional moment as function of frequency, length [number_of_frequencies]
:return: array with shape [number_of_frequencies,number_of_direction] representing the directional distribution of the waves at each frequency.
Maximize the enthrophy of the solution with entrophy defined as:
integrate - D * log(D) over directions
such that the resulting distribution D reproduces the observed moments.
Expand source code
def mem2_scipy_root_finder( directions_radians: np.ndarray, a1: np.ndarray, b1: np.ndarray, a2: np.ndarray, b2: np.ndarray, progress, **kwargs ) -> np.ndarray: """ Return the directional distribution that maximizes Shannon [ - D log(D) ] enthrophy constrained by given observed directional moments, :param directions_radians: 1d array of wave directions in radians, length[number_of_directions] :param a1: 1d array of cosine directional moment as function of frequency, length [number_of_frequencies] :param b1: 1d array of sine directional moment as function of frequency, length [number_of_frequencies] :param a2: 1d array of double angle cosine directional moment as function of frequency, length [number_of_frequencies] :param b2: 1d array of double angle sine directional moment as function of frequency, length [number_of_frequencies] :return: array with shape [number_of_frequencies,number_of_direction] representing the directional distribution of the waves at each frequency. Maximize the enthrophy of the solution with entrophy defined as: integrate - D * log(D) over directions such that the resulting distribution D reproduces the observed moments. """ number_of_frequencies = a1.shape[-1] number_of_points = a1.shape[0] directional_distribution = np.zeros( (number_of_points, number_of_frequencies, len(directions_radians)) ) direction_increment = get_direction_increment(directions_radians) twiddle_factors = np.empty((4, len(directions_radians))) twiddle_factors[0, :] = np.cos(directions_radians) twiddle_factors[1, :] = np.sin(directions_radians) twiddle_factors[2, :] = np.cos(2 * directions_radians) twiddle_factors[3, :] = np.sin(2 * directions_radians) guess = initial_value(a1, b1, a2, b2) for ipoint in range(0, number_of_points): progress.update(1) for ifreq in range(0, number_of_frequencies): # moments = np.array( [ a1[ipoint, ifreq], b1[ipoint, ifreq], a2[ipoint, ifreq], b2[ipoint, ifreq], ] ) if np.any(np.isnan(guess[ipoint, ifreq, :])): continue res = scipy.optimize.root( moment_constraints, guess[ipoint, ifreq, :], args=(twiddle_factors, moments, direction_increment), method="lm", ) lambas = res.x directional_distribution[ipoint, ifreq, :] = mem2_directional_distribution( lambas, direction_increment, twiddle_factors ) return directional_distribution
def moment_constraints(lambdas, twiddle_factors, moments, direction_increment)
-
Construct the nonlinear equations we need to solve for lambda. The constrainst are the difference between the desired moments a1,b1,a2,b2 and the moment calculated from the current distribution guess and for a perfect fit should be 0.
To note: we differ from Kim et al here who formulate the constraints using unnormalized equations. Here we opt to use the normalized version as that allows us to cast the error / or mismatch directly in terms of an error in the moments.
:param lambdas: the lagrange multipliers :param twiddle_factors: [sin theta, cost theta, sin 2theta, cos 2theta] as a 4 by ndir array :param moments: [a1,b1,a2,b2] :param direction_increment: directional stepsize used in the integration, nd-array :return: array (length=4) with the difference between desired moments and those calculated from the current approximate distribution
Expand source code
@numba.njit(cache=True, fastmath=_FASTMATH) def moment_constraints(lambdas, twiddle_factors, moments, direction_increment): """ Construct the nonlinear equations we need to solve for lambda. The constrainst are the difference between the desired moments a1,b1,a2,b2 and the moment calculated from the current distribution guess and for a perfect fit should be 0. To note: we differ from Kim et al here who formulate the constraints using unnormalized equations. Here we opt to use the normalized version as that allows us to cast the error / or mismatch directly in terms of an error in the moments. :param lambdas: the lagrange multipliers :param twiddle_factors: [sin theta, cost theta, sin 2*theta, cos 2*theta] as a 4 by ndir array :param moments: [a1,b1,a2,b2] :param direction_increment: directional stepsize used in the integration, nd-array :return: array (length=4) with the difference between desired moments and those calculated from the current approximate distribution """ # Get the current estimate of the directional distribution dist = mem2_directional_distribution(lambdas, direction_increment, twiddle_factors) out = np.zeros(4) for mm in range(0, 4): # note - the part after the "-" is just a discrete approximation of the # Fourier sine/cosine amplitude (moment) out[mm] = moments[mm] - np.sum( (twiddle_factors[mm, :]) * dist * direction_increment ) return out
def solve_cholesky(matrix, rhs)
-
Solve using cholesky decomposition according to the Cholesky–Banachiewicz algorithm. See: https://en.wikipedia.org/wiki/Cholesky_decomposition#The_Cholesky_algorithm
Expand source code
@numba.njit(cache=True, fastmath=True) def solve_cholesky(matrix, rhs): """ Solve using cholesky decomposition according to the Cholesky–Banachiewicz algorithm. See: https://en.wikipedia.org/wiki/Cholesky_decomposition#The_Cholesky_algorithm """ M, N = matrix.shape x = np.zeros(M) cholesky_decomposition = np.zeros((M, M)) inv = np.zeros(M) for mm in range(0, M): forward_sub_sum = rhs[mm] for nn in range(0, mm): sum = matrix[mm, nn] for kk in range(0, nn): sum -= cholesky_decomposition[mm, kk] * cholesky_decomposition[nn, kk] cholesky_decomposition[mm, nn] = inv[nn] * sum forward_sub_sum += -cholesky_decomposition[mm, nn] * x[nn] sum = matrix[mm, mm] for kk in range(0, mm): sum -= cholesky_decomposition[mm, kk] ** 2 if sum <= 0.0: raise ValueError( "Matrix not positive definite, likely due to finite precision errors." ) cholesky_decomposition[mm, mm] = np.sqrt(sum) inv[mm] = 1 / cholesky_decomposition[mm, mm] x[mm] = forward_sub_sum * inv[mm] # Backward Substitution (in place) for mm in range(0, M): kk = M - mm - 1 sum = x[kk] for nn in range(kk + 1, N): sum += -cholesky_decomposition[nn, kk] * x[nn] x[kk] = sum * inv[kk] return x