Module ocean_science_utilities.wavephysics.train_wind_estimate
Expand source code
import numpy as np
import xarray
from typing import List, Optional
from scipy.optimize import minimize # type: ignore
from ocean_science_utilities.wavephysics.balance.balance import SourceTermBalance
from ocean_science_utilities.wavephysics.balance.wind_inversion import (
windspeed_and_direction_from_spectra,
)
from ocean_science_utilities.wavespectra.spectrum import (
FrequencySpectrum,
FrequencyDirectionSpectrum,
WaveSpectrum,
)
from ocean_science_utilities.wavephysics.windestimate import estimate_u10_from_spectrum
def rmse(target, actual, jacobian_actual=None, weights=None):
N = len(target)
if weights is None:
weights = 1 / N
mean_error = sum(weights * (actual - target) ** 2) / 2
if jacobian_actual is not None:
error_gradient = (weights * (actual - target))[None, :] @ jacobian_actual
return mean_error, np.squeeze(error_gradient)
else:
return mean_error
def mae(target, actual, jacobian_actual=None, weights=None):
N = len(target)
error = sum(np.abs(actual - target)) / N
if jacobian_actual is not None:
error_gradient = (np.sign(actual - target)[None, :] @ jacobian_actual) / N
return error, np.squeeze(error_gradient)
else:
return error
def create_weighted_metric(name, binsize, number_of_bins, target):
bins = np.linspace(0, number_of_bins * binsize, number_of_bins, endpoint=False)
index = np.digitize(target, bins)
weights = np.zeros(len(target))
for ii in range(number_of_bins + 1):
mask = ii == index
n = np.sum(mask)
if n == 0:
continue
weights[mask] = 1 / (n * number_of_bins)
return create_metric(name, weights)
def create_metric(name, weights=None):
if name == "rmse":
func = rmse
elif name == "mae":
func = mae
elif name == "huber":
func = huber
def _closure(target, actual, jacobian_actual=None):
return func(target, actual, jacobian_actual, weights=weights)
return _closure
def huber(target, actual, jacobian_actual=None, weights=None):
diff = actual - target
delta = np.abs(diff)
N = len(target)
error = sum(np.where(delta < 1, delta**2 / 2, delta - 1 / 2)) / N
if jacobian_actual is not None:
vector = np.where(delta < 1, diff, np.sign(diff))
error_gradient = (vector @ jacobian_actual) / N
return error, np.squeeze(error_gradient)
else:
return error
def calibrate_wind_estimate_from_spectrum(
method,
target_u10: xarray.DataArray,
spectrum: FrequencySpectrum,
parameter_names: Optional[List[str]] = None,
loss_function=None,
velocity_scale=None,
bounds=None,
params=None,
):
if parameter_names is None:
parameter_names = [
"phillips_constant_beta",
"charnock_constant",
]
estimate_default_parameters = {
"directional_spreading_constant": 2.5,
"phillips_constant_beta": 0.012,
"charnock_constant": 0.015,
}
scale = {}
for parameter_name in parameter_names:
scale[parameter_name] = estimate_default_parameters[parameter_name]
if loss_function is None:
loss_function = rmse
if velocity_scale is None:
velocity_scale = np.median(target_u10)
if params is None:
params = {}
x0 = np.ones(len(parameter_names))
def training_function(values):
estimate_input = {}
for parameter_name, value in zip(parameter_names, values):
estimate_input[parameter_name] = value * scale[parameter_name]
estimate_input = estimate_default_parameters | estimate_input | params
estimate = estimate_u10_from_spectrum(spectrum, method=method, **estimate_input)
actual = estimate["u10"].values / velocity_scale
actual[np.isnan(actual)] = 0.0
target = target_u10.values / velocity_scale
return loss_function(target, actual)
if bounds is None:
_bounds = [(0.01, 100) for x in x0]
else:
_bounds = []
for parameter_name in parameter_names:
x = bounds[parameter_name]
_scale = scale[parameter_name]
_bounds.append((x[0] / _scale, x[1] / _scale))
options = {"maxiter": 100, "disp": True}
res = minimize(
training_function, x0, method="SLSQP", bounds=_bounds, options=options
)
return {key: x * scale[key] for key, x in zip(parameter_names, res.x)}
def calibrate_wind_estimate_from_balance(
balance: SourceTermBalance,
parameter_names: List[str],
target_u10: xarray.DataArray,
spectrum: FrequencyDirectionSpectrum,
loss_function=None,
velocity_scale=None,
params=None,
time_derivative_spectrum: Optional[FrequencyDirectionSpectrum] = None,
direction_iteration=False,
):
dissipation = balance.dissipation
generation = balance.generation
if params is not None:
balance.update_parameters(params)
scale = {}
for parameter_name in parameter_names:
if parameter_name in dissipation._parameters:
par = dissipation._parameters
elif parameter_name in generation._parameters:
par = generation._parameters
else:
raise ValueError(f"unknown parameter {parameter_name}")
scale[parameter_name] = par[parameter_name]
if loss_function is None:
loss_function = rmse
if velocity_scale is None:
velocity_scale = np.median(target_u10)
def training_function(values):
"""
Training function that returns the current loss value.
:param values: list of scaled floats, in the order of the names in
parameter names
:return: the loss function and an approximation of the gradient
"""
params = {}
for parameter_name, value in zip(parameter_names, values):
params[parameter_name] = value * scale[parameter_name]
balance.update_parameters(params)
# Estimate the wind speed, direction and gradient.
estimate = windspeed_and_direction_from_spectra(
balance,
target_u10,
spectrum,
jacobian=True,
jacobian_parameters=parameter_names,
time_derivative_spectrum=time_derivative_spectrum,
direction_iteration=direction_iteration,
)
actual = estimate["u10"].values
actual[~np.isfinite(actual)] = 0.0
actual = estimate["u10"].values / velocity_scale
actual[np.isnan(actual)] = 0.0
target = target_u10.values / velocity_scale
jacobian = estimate["jacobian"].values / velocity_scale
err, grad_err = loss_function(target, actual, jacobian_actual=jacobian)
# Apply scaling
for index, parameter_name in enumerate(parameter_names):
grad_err[index] = grad_err[index] * scale[parameter_name]
return (err, grad_err)
x0 = np.ones(len(parameter_names))
bounds = [(0.01, 100) for x in x0]
options = {"maxiter": 100, "disp": True}
res = minimize(
training_function,
x0,
method="SLSQP",
bounds=bounds,
options=options,
jac=True,
)
return balance.get_parameters | {
key: x * scale[key] for key, x in zip(parameter_names, res.x)
}
def prep_data(
spectrum: WaveSpectrum, target_u10: xarray.DataArray, threshold=(-np.inf, np.inf)
):
mask = (
(spectrum.is_valid())
& (target_u10.values >= threshold[0])
& (target_u10.values <= threshold[1])
)
return spectrum.where(mask), target_u10[mask.values]
Functions
def calibrate_wind_estimate_from_balance(balance: SourceTermBalance, parameter_names: List[str], target_u10: xarray.core.dataarray.DataArray, spectrum: FrequencyDirectionSpectrum, loss_function=None, velocity_scale=None, params=None, time_derivative_spectrum: Optional[FrequencyDirectionSpectrum] = None, direction_iteration=False)
-
Expand source code
def calibrate_wind_estimate_from_balance( balance: SourceTermBalance, parameter_names: List[str], target_u10: xarray.DataArray, spectrum: FrequencyDirectionSpectrum, loss_function=None, velocity_scale=None, params=None, time_derivative_spectrum: Optional[FrequencyDirectionSpectrum] = None, direction_iteration=False, ): dissipation = balance.dissipation generation = balance.generation if params is not None: balance.update_parameters(params) scale = {} for parameter_name in parameter_names: if parameter_name in dissipation._parameters: par = dissipation._parameters elif parameter_name in generation._parameters: par = generation._parameters else: raise ValueError(f"unknown parameter {parameter_name}") scale[parameter_name] = par[parameter_name] if loss_function is None: loss_function = rmse if velocity_scale is None: velocity_scale = np.median(target_u10) def training_function(values): """ Training function that returns the current loss value. :param values: list of scaled floats, in the order of the names in parameter names :return: the loss function and an approximation of the gradient """ params = {} for parameter_name, value in zip(parameter_names, values): params[parameter_name] = value * scale[parameter_name] balance.update_parameters(params) # Estimate the wind speed, direction and gradient. estimate = windspeed_and_direction_from_spectra( balance, target_u10, spectrum, jacobian=True, jacobian_parameters=parameter_names, time_derivative_spectrum=time_derivative_spectrum, direction_iteration=direction_iteration, ) actual = estimate["u10"].values actual[~np.isfinite(actual)] = 0.0 actual = estimate["u10"].values / velocity_scale actual[np.isnan(actual)] = 0.0 target = target_u10.values / velocity_scale jacobian = estimate["jacobian"].values / velocity_scale err, grad_err = loss_function(target, actual, jacobian_actual=jacobian) # Apply scaling for index, parameter_name in enumerate(parameter_names): grad_err[index] = grad_err[index] * scale[parameter_name] return (err, grad_err) x0 = np.ones(len(parameter_names)) bounds = [(0.01, 100) for x in x0] options = {"maxiter": 100, "disp": True} res = minimize( training_function, x0, method="SLSQP", bounds=bounds, options=options, jac=True, ) return balance.get_parameters | { key: x * scale[key] for key, x in zip(parameter_names, res.x) }
def calibrate_wind_estimate_from_spectrum(method, target_u10: xarray.core.dataarray.DataArray, spectrum: FrequencySpectrum, parameter_names: Optional[List[str]] = None, loss_function=None, velocity_scale=None, bounds=None, params=None)
-
Expand source code
def calibrate_wind_estimate_from_spectrum( method, target_u10: xarray.DataArray, spectrum: FrequencySpectrum, parameter_names: Optional[List[str]] = None, loss_function=None, velocity_scale=None, bounds=None, params=None, ): if parameter_names is None: parameter_names = [ "phillips_constant_beta", "charnock_constant", ] estimate_default_parameters = { "directional_spreading_constant": 2.5, "phillips_constant_beta": 0.012, "charnock_constant": 0.015, } scale = {} for parameter_name in parameter_names: scale[parameter_name] = estimate_default_parameters[parameter_name] if loss_function is None: loss_function = rmse if velocity_scale is None: velocity_scale = np.median(target_u10) if params is None: params = {} x0 = np.ones(len(parameter_names)) def training_function(values): estimate_input = {} for parameter_name, value in zip(parameter_names, values): estimate_input[parameter_name] = value * scale[parameter_name] estimate_input = estimate_default_parameters | estimate_input | params estimate = estimate_u10_from_spectrum(spectrum, method=method, **estimate_input) actual = estimate["u10"].values / velocity_scale actual[np.isnan(actual)] = 0.0 target = target_u10.values / velocity_scale return loss_function(target, actual) if bounds is None: _bounds = [(0.01, 100) for x in x0] else: _bounds = [] for parameter_name in parameter_names: x = bounds[parameter_name] _scale = scale[parameter_name] _bounds.append((x[0] / _scale, x[1] / _scale)) options = {"maxiter": 100, "disp": True} res = minimize( training_function, x0, method="SLSQP", bounds=_bounds, options=options ) return {key: x * scale[key] for key, x in zip(parameter_names, res.x)}
def create_metric(name, weights=None)
-
Expand source code
def create_metric(name, weights=None): if name == "rmse": func = rmse elif name == "mae": func = mae elif name == "huber": func = huber def _closure(target, actual, jacobian_actual=None): return func(target, actual, jacobian_actual, weights=weights) return _closure
def create_weighted_metric(name, binsize, number_of_bins, target)
-
Expand source code
def create_weighted_metric(name, binsize, number_of_bins, target): bins = np.linspace(0, number_of_bins * binsize, number_of_bins, endpoint=False) index = np.digitize(target, bins) weights = np.zeros(len(target)) for ii in range(number_of_bins + 1): mask = ii == index n = np.sum(mask) if n == 0: continue weights[mask] = 1 / (n * number_of_bins) return create_metric(name, weights)
def huber(target, actual, jacobian_actual=None, weights=None)
-
Expand source code
def huber(target, actual, jacobian_actual=None, weights=None): diff = actual - target delta = np.abs(diff) N = len(target) error = sum(np.where(delta < 1, delta**2 / 2, delta - 1 / 2)) / N if jacobian_actual is not None: vector = np.where(delta < 1, diff, np.sign(diff)) error_gradient = (vector @ jacobian_actual) / N return error, np.squeeze(error_gradient) else: return error
def mae(target, actual, jacobian_actual=None, weights=None)
-
Expand source code
def mae(target, actual, jacobian_actual=None, weights=None): N = len(target) error = sum(np.abs(actual - target)) / N if jacobian_actual is not None: error_gradient = (np.sign(actual - target)[None, :] @ jacobian_actual) / N return error, np.squeeze(error_gradient) else: return error
def prep_data(spectrum: WaveSpectrum, target_u10: xarray.core.dataarray.DataArray, threshold=(-inf, inf))
-
Expand source code
def prep_data( spectrum: WaveSpectrum, target_u10: xarray.DataArray, threshold=(-np.inf, np.inf) ): mask = ( (spectrum.is_valid()) & (target_u10.values >= threshold[0]) & (target_u10.values <= threshold[1]) ) return spectrum.where(mask), target_u10[mask.values]
def rmse(target, actual, jacobian_actual=None, weights=None)
-
Expand source code
def rmse(target, actual, jacobian_actual=None, weights=None): N = len(target) if weights is None: weights = 1 / N mean_error = sum(weights * (actual - target) ** 2) / 2 if jacobian_actual is not None: error_gradient = (weights * (actual - target))[None, :] @ jacobian_actual return mean_error, np.squeeze(error_gradient) else: return mean_error