Module ocean_science_utilities.wavephysics.balance.solvers

Expand source code
import numba  # type: ignore
import numpy as np
import xarray

from typing import TypeVar

from ocean_science_utilities.wavephysics.balance._numba_settings import numba_nocache

_T = TypeVar("_T", np.typing.ArrayLike, xarray.DataArray)


@numba.jit(**numba_nocache)
def numba_fixed_point_iteration(
    function,
    guess,
    args,
    bounds=(-np.inf, np.inf),
) -> _T:
    """
    Fixed point iteration on a vector function. We want to solve the parallal problem
    x=F(x) where x is a vector. Instead of looping over each problem and solving them
    individualy using e.g. scipy solvers, we gain some efficiency by evaluating F in
    parallel, and doing the iteration ourselves. Only worthwhile if F is the
    expensive part and/or x is large.

    :param function:
    :param guess:
    :param max_iter:
    :param atol:
    :param rtol:
    :param caller:
    :return:
    """

    iterates = [guess, guess, guess]
    max_iter = 100
    aitken_acceleration = True
    rtol = 1e-4
    atol = 1e-4
    for current_iteration in range(1, max_iter + 1):
        # Update iterate
        aitken_step = aitken_acceleration and current_iteration % 3 == 0
        if aitken_step:
            # Every third step do an aitken acceleration step- if requested
            numerator = iterates[2] - iterates[1]
            denominator = iterates[1] - iterates[0]

            if denominator != 0.0 and numerator != denominator:
                ratio = numerator / denominator
                next_iterate = iterates[2] + ratio / (1.0 - ratio) * (numerator)
            else:
                next_iterate = iterates[2]
        else:
            next_iterate = function(iterates[2], *args)

        # Bounds check
        if next_iterate < bounds[0]:
            next_iterate = (bounds[0] - iterates[2]) * 0.5 + iterates[2]

        if next_iterate > bounds[1]:
            next_iterate = (bounds[1] - iterates[2]) * 0.5 + iterates[2]

        # Roll the iterates, make the last entry the latest estimate
        iterates.append(iterates.pop(0))
        iterates[2] = next_iterate

        # Convergence check
        scale = max(np.abs(iterates[1]), atol)
        absolute_difference = np.abs(iterates[2] - iterates[1])
        relative_difference = absolute_difference / scale

        if (absolute_difference < atol) & (
            relative_difference < rtol
        ) and not aitken_step:
            break
    else:
        raise ValueError("no convergence")

    return iterates[2]


@numba.jit(**numba_nocache)
def numba_newton_raphson(
    function,
    guess,
    function_arguments,
    hard_bounds=(-np.inf, np.inf),
    max_iterations=100,
    aitken_acceleration=True,
    atol=1e-4,
    rtol=1e-4,
    numerical_stepsize=1e-4,
    verbose=False,
    error_on_max_iter=True,
    relative_stepsize=False,
    name="",
    under_relaxation=0.9,
):
    # The last three iterates. Newest iterate last. We initially fill all
    # with the guess.
    iterates = [guess, guess, guess]

    # The function evaluatio at the last three iterate, last evation last.
    # We initially fill all with 0. so that entries
    # are invalid until we get to the 3rd iterate.
    func_evals = [0.0, 0.0, 0.0]

    # Guess root bounds. Initially this may not actually bound the root, but if
    # the guess is reasonable, it may.
    root_bounds = [guess - 0.5 * np.abs(guess), guess + 0.5 * np.abs(guess)]

    # The function evaluated at the bounds. We may get lucky and immediately
    # bound the root.
    func_at_bounds = [
        function(root_bounds[0], *function_arguments),
        function(root_bounds[1], *function_arguments),
    ]

    # Is the root bounded?
    root_bounded = func_at_bounds[0] * func_at_bounds[1] < 0

    # Start iteration.
    for current_iteration in range(1, max_iterations):
        # Evaluate the function at the latest iterate. First roll the list...
        func_evals.append(func_evals.pop(0))
        # ... and then update the latest point.
        func_evals[2] = function(iterates[2], *function_arguments)

        # Every 3rd step we do a Aitken series acceleration step
        aitken_step = aitken_acceleration and current_iteration % 3 == 0
        if aitken_step:
            # We do an Aitkon step
            numerator = iterates[2] - iterates[1]
            denominator = iterates[1] - iterates[0]
            ratio = numerator / denominator
            next_iterate = iterates[2] + ratio / (1.0 - ratio) * (numerator)

        else:
            # We use a regular update step. This can either be: numerical estimate
            # for Newton Raphson, Secant, or a bisection step.

            # First lets find out if our itereate is within the root bounds. Initially,
            # when the root is not yet bounded we will escape the bounds as we do
            # gradient descent with Newton.
            if iterates[2] < root_bounds[0]:
                # our iterate is smaller than the left bound - set the left bound to
                # the iterate, but respect the overall bounds...
                root_bounds[0] = iterates[2]
                func_at_bounds[0] = func_evals[2]

            elif iterates[2] > root_bounds[1]:
                root_bounds[1] = iterates[2]
                func_at_bounds[1] = func_evals[2]

            # If we are within the current bounds, does any of the sections
            # contain the root?
            elif func_at_bounds[0] * func_evals[2] < 0:
                func_at_bounds[1] = func_evals[2]
                root_bounds[1] = iterates[2]
            elif func_at_bounds[1] * func_evals[2] < 0:
                func_at_bounds[0] = func_evals[2]
                root_bounds[0] = iterates[2]

            # With the boundaries updated, check if we now bound the root.
            root_bounded = func_at_bounds[0] * func_at_bounds[1] < 0

            # If we bound the root we can start using secant - if it wants to
            # escape the bounds we will just use a bisection step, if it doesn't
            # secant is faster than bisection.
            if root_bounded and current_iteration > 1:
                # Secant estimate
                derivative = (func_evals[2] - func_evals[1]) / (
                    iterates[2] - iterates[1]
                )
            else:
                if relative_stepsize:
                    stepsize = iterates[2] * numerical_stepsize
                else:
                    stepsize = numerical_stepsize

                # Finite difference estimate
                derivative = (
                    function(iterates[2] + stepsize, *function_arguments)
                    - func_evals[2]
                ) / stepsize

            # We encountered a stationary point.
            if derivative == 0.0:
                # If we have the root bounded- let's do a bisection step to keep going.
                if root_bounded:
                    update = (root_bounds[1] - root_bounds[0]) / 2

                else:
                    raise ValueError("")

            else:
                update = -func_evals[2] / derivative

            next_iterate = iterates[2] + update * under_relaxation

        # Bounds check
        if not root_bounded:
            bounds_to_check = hard_bounds
        else:
            bounds_to_check = (root_bounds[0], root_bounds[1])

        if next_iterate < bounds_to_check[0]:
            next_iterate = (bounds_to_check[0] - iterates[2]) * 0.5 + iterates[2]

        if next_iterate > bounds_to_check[1]:
            next_iterate = (bounds_to_check[1] - iterates[2]) * 0.5 + iterates[2]

        # Roll the iterates, make the last entry the latest estimate
        iterates.append(iterates.pop(0))
        iterates[2] = next_iterate

        # Convergence check
        scale = max(np.abs(iterates[1]), atol)
        absolute_difference = np.abs(iterates[2] - iterates[1])
        relative_difference = absolute_difference / scale

        if verbose:
            # In case anybody wonders - this monstrosity is needed because Numba does
            # not currently support converting floats to strings.
            print(
                "name:",
                name,
                "Iteration",
                current_iteration,
                "max abs. errors:",
                absolute_difference,
                "(atol:",
                atol,
                ") max rel. error:",
                relative_difference,
                "(rtol: ",
                rtol,
                ") ",
                "current value",
                iterates[2],
            )

        if (absolute_difference < atol) & (relative_difference < rtol):
            break

    else:
        if error_on_max_iter:
            if verbose:
                print("no convergence", name, root_bounded)
            raise ValueError("no convergence")
        else:
            return iterates[2]

    return iterates[2]

Functions

def numba_fixed_point_iteration(function, guess, args, bounds=(-inf, inf)) ‑> ~_T

Fixed point iteration on a vector function. We want to solve the parallal problem x=F(x) where x is a vector. Instead of looping over each problem and solving them individualy using e.g. scipy solvers, we gain some efficiency by evaluating F in parallel, and doing the iteration ourselves. Only worthwhile if F is the expensive part and/or x is large.

:param function: :param guess: :param max_iter: :param atol: :param rtol: :param caller: :return:

Expand source code
@numba.jit(**numba_nocache)
def numba_fixed_point_iteration(
    function,
    guess,
    args,
    bounds=(-np.inf, np.inf),
) -> _T:
    """
    Fixed point iteration on a vector function. We want to solve the parallal problem
    x=F(x) where x is a vector. Instead of looping over each problem and solving them
    individualy using e.g. scipy solvers, we gain some efficiency by evaluating F in
    parallel, and doing the iteration ourselves. Only worthwhile if F is the
    expensive part and/or x is large.

    :param function:
    :param guess:
    :param max_iter:
    :param atol:
    :param rtol:
    :param caller:
    :return:
    """

    iterates = [guess, guess, guess]
    max_iter = 100
    aitken_acceleration = True
    rtol = 1e-4
    atol = 1e-4
    for current_iteration in range(1, max_iter + 1):
        # Update iterate
        aitken_step = aitken_acceleration and current_iteration % 3 == 0
        if aitken_step:
            # Every third step do an aitken acceleration step- if requested
            numerator = iterates[2] - iterates[1]
            denominator = iterates[1] - iterates[0]

            if denominator != 0.0 and numerator != denominator:
                ratio = numerator / denominator
                next_iterate = iterates[2] + ratio / (1.0 - ratio) * (numerator)
            else:
                next_iterate = iterates[2]
        else:
            next_iterate = function(iterates[2], *args)

        # Bounds check
        if next_iterate < bounds[0]:
            next_iterate = (bounds[0] - iterates[2]) * 0.5 + iterates[2]

        if next_iterate > bounds[1]:
            next_iterate = (bounds[1] - iterates[2]) * 0.5 + iterates[2]

        # Roll the iterates, make the last entry the latest estimate
        iterates.append(iterates.pop(0))
        iterates[2] = next_iterate

        # Convergence check
        scale = max(np.abs(iterates[1]), atol)
        absolute_difference = np.abs(iterates[2] - iterates[1])
        relative_difference = absolute_difference / scale

        if (absolute_difference < atol) & (
            relative_difference < rtol
        ) and not aitken_step:
            break
    else:
        raise ValueError("no convergence")

    return iterates[2]
def numba_newton_raphson(function, guess, function_arguments, hard_bounds=(-inf, inf), max_iterations=100, aitken_acceleration=True, atol=0.0001, rtol=0.0001, numerical_stepsize=0.0001, verbose=False, error_on_max_iter=True, relative_stepsize=False, name='', under_relaxation=0.9)
Expand source code
@numba.jit(**numba_nocache)
def numba_newton_raphson(
    function,
    guess,
    function_arguments,
    hard_bounds=(-np.inf, np.inf),
    max_iterations=100,
    aitken_acceleration=True,
    atol=1e-4,
    rtol=1e-4,
    numerical_stepsize=1e-4,
    verbose=False,
    error_on_max_iter=True,
    relative_stepsize=False,
    name="",
    under_relaxation=0.9,
):
    # The last three iterates. Newest iterate last. We initially fill all
    # with the guess.
    iterates = [guess, guess, guess]

    # The function evaluatio at the last three iterate, last evation last.
    # We initially fill all with 0. so that entries
    # are invalid until we get to the 3rd iterate.
    func_evals = [0.0, 0.0, 0.0]

    # Guess root bounds. Initially this may not actually bound the root, but if
    # the guess is reasonable, it may.
    root_bounds = [guess - 0.5 * np.abs(guess), guess + 0.5 * np.abs(guess)]

    # The function evaluated at the bounds. We may get lucky and immediately
    # bound the root.
    func_at_bounds = [
        function(root_bounds[0], *function_arguments),
        function(root_bounds[1], *function_arguments),
    ]

    # Is the root bounded?
    root_bounded = func_at_bounds[0] * func_at_bounds[1] < 0

    # Start iteration.
    for current_iteration in range(1, max_iterations):
        # Evaluate the function at the latest iterate. First roll the list...
        func_evals.append(func_evals.pop(0))
        # ... and then update the latest point.
        func_evals[2] = function(iterates[2], *function_arguments)

        # Every 3rd step we do a Aitken series acceleration step
        aitken_step = aitken_acceleration and current_iteration % 3 == 0
        if aitken_step:
            # We do an Aitkon step
            numerator = iterates[2] - iterates[1]
            denominator = iterates[1] - iterates[0]
            ratio = numerator / denominator
            next_iterate = iterates[2] + ratio / (1.0 - ratio) * (numerator)

        else:
            # We use a regular update step. This can either be: numerical estimate
            # for Newton Raphson, Secant, or a bisection step.

            # First lets find out if our itereate is within the root bounds. Initially,
            # when the root is not yet bounded we will escape the bounds as we do
            # gradient descent with Newton.
            if iterates[2] < root_bounds[0]:
                # our iterate is smaller than the left bound - set the left bound to
                # the iterate, but respect the overall bounds...
                root_bounds[0] = iterates[2]
                func_at_bounds[0] = func_evals[2]

            elif iterates[2] > root_bounds[1]:
                root_bounds[1] = iterates[2]
                func_at_bounds[1] = func_evals[2]

            # If we are within the current bounds, does any of the sections
            # contain the root?
            elif func_at_bounds[0] * func_evals[2] < 0:
                func_at_bounds[1] = func_evals[2]
                root_bounds[1] = iterates[2]
            elif func_at_bounds[1] * func_evals[2] < 0:
                func_at_bounds[0] = func_evals[2]
                root_bounds[0] = iterates[2]

            # With the boundaries updated, check if we now bound the root.
            root_bounded = func_at_bounds[0] * func_at_bounds[1] < 0

            # If we bound the root we can start using secant - if it wants to
            # escape the bounds we will just use a bisection step, if it doesn't
            # secant is faster than bisection.
            if root_bounded and current_iteration > 1:
                # Secant estimate
                derivative = (func_evals[2] - func_evals[1]) / (
                    iterates[2] - iterates[1]
                )
            else:
                if relative_stepsize:
                    stepsize = iterates[2] * numerical_stepsize
                else:
                    stepsize = numerical_stepsize

                # Finite difference estimate
                derivative = (
                    function(iterates[2] + stepsize, *function_arguments)
                    - func_evals[2]
                ) / stepsize

            # We encountered a stationary point.
            if derivative == 0.0:
                # If we have the root bounded- let's do a bisection step to keep going.
                if root_bounded:
                    update = (root_bounds[1] - root_bounds[0]) / 2

                else:
                    raise ValueError("")

            else:
                update = -func_evals[2] / derivative

            next_iterate = iterates[2] + update * under_relaxation

        # Bounds check
        if not root_bounded:
            bounds_to_check = hard_bounds
        else:
            bounds_to_check = (root_bounds[0], root_bounds[1])

        if next_iterate < bounds_to_check[0]:
            next_iterate = (bounds_to_check[0] - iterates[2]) * 0.5 + iterates[2]

        if next_iterate > bounds_to_check[1]:
            next_iterate = (bounds_to_check[1] - iterates[2]) * 0.5 + iterates[2]

        # Roll the iterates, make the last entry the latest estimate
        iterates.append(iterates.pop(0))
        iterates[2] = next_iterate

        # Convergence check
        scale = max(np.abs(iterates[1]), atol)
        absolute_difference = np.abs(iterates[2] - iterates[1])
        relative_difference = absolute_difference / scale

        if verbose:
            # In case anybody wonders - this monstrosity is needed because Numba does
            # not currently support converting floats to strings.
            print(
                "name:",
                name,
                "Iteration",
                current_iteration,
                "max abs. errors:",
                absolute_difference,
                "(atol:",
                atol,
                ") max rel. error:",
                relative_difference,
                "(rtol: ",
                rtol,
                ") ",
                "current value",
                iterates[2],
            )

        if (absolute_difference < atol) & (relative_difference < rtol):
            break

    else:
        if error_on_max_iter:
            if verbose:
                print("no convergence", name, root_bounded)
            raise ValueError("no convergence")
        else:
            return iterates[2]

    return iterates[2]