"""tkminfit.py
scipy.optimize.minimize ベースの最小二乗共通ランナー。
既存の tknlsq.nonlinear_lsq は scipy.optimize.least_squares 用。
このモジュールは、Nelder-Mead など minimize 系メソッドを使いたい
アプリ用の薄い共通層。
主な用途:
- optid=1 の非線形パラメータ最適化
- optid_lin=1 の線形ブロック最適化
- variable projection: 非線形パラメータごとに線形パラメータを内部で解く
- 数値Jacobianから共分散と標準誤差を推定
:doc:`tkminfit_usage`
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Mapping, Optional, Sequence, Tuple
import numpy as np
import tklsq as tkcore
import tkparamio as tkparamio
from tknlsq import numerical_jacobian, covariance_from_jacobian
#import tklsq.tklsq as tkcore
#import tklsq.tkparamio as tkparamio
#from tklsq.tknlsq import numerical_jacobian, covariance_from_jacobian
ArrayLike = Sequence[float] | np.ndarray
ParamValues = Dict[str, float]
[ドキュメント]
@dataclass
class MinimizeLSQResult:
"""minimize ベースの最小二乗最適化結果を格納するデータクラス。
最適化プロセスで得られたパラメータ、残差、共分散、標準誤差などの情報を保持します。
variable projection を使用した場合、線形最適化の結果も含まれます。
:ivar params: Dict[str, float]: 最適化後の全パラメータの辞書。
:ivar params_free: np.ndarray: 最適化された(自由な)パラメータの値のNumPy配列。
:ivar names: List[str]: 全パラメータ名のリスト。
:ivar free_names: List[str]: 最適化された(自由な)パラメータ名のリスト。
:ivar optimized_names: List[str]: `minimize_lsq` では `free_names` と同じ。`variable_projection_lsq` では非線形パラメータ名。
:ivar linear_names: List[str]: 線形パラメータ名のリスト。
:ivar residuals: np.ndarray: 最適化後の残差ベクトル。
:ivar jacobian: Optional[np.ndarray]: 最適化後のヤコビアン行列(自由パラメータに対する残差の偏微分)。共分散計算のために推定されます。
:ivar cov_free: Optional[np.ndarray]: 自由パラメータの共分散行列。
:ivar stderr_free: Optional[np.ndarray]: 自由パラメータの標準誤差ベクトル。
:ivar stderr: Dict[str, Optional[float]]: 全パラメータの標準誤差の辞書。最適化されないパラメータや共分散計算ができない場合はNone。
:ivar sigma2_resid: Optional[float]: 残差分散の推定値 (s^2)。
:ivar dof: int: 自由度 (N - p_free)。
:ivar N: int: データ点数(残差の要素数)。
:ivar p_free: int: 自由パラメータ数。
:ivar RSS: float: 残差平方和。
:ivar objective: float: 目的関数値(RSS + penalty)。
:ivar success: bool: 最適化が成功したかどうかの真偽値。
:ivar message: str: 最適化プロセスの終了メッセージ。
:ivar nfev: Optional[int]: 関数評価回数(最小化アルゴリズムにより提供される場合)。
:ivar nit: Optional[int]: イテレーション回数(最小化アルゴリズムにより提供される場合)。
:ivar warning: str: 共分散計算に関する警告メッセージ。
:ivar raw_result: object: `scipy.optimize.minimize` から返される生の最適化結果オブジェクト。
:ivar linear_result: object: `variable_projection_lsq` で使用される線形最小二乗の生の結果オブジェクト。
"""
params: Dict[str, float]
params_free: np.ndarray
names: List[str]
free_names: List[str]
optimized_names: List[str]
linear_names: List[str]
residuals: np.ndarray
jacobian: Optional[np.ndarray]
cov_free: Optional[np.ndarray]
stderr_free: Optional[np.ndarray]
stderr: Dict[str, Optional[float]]
sigma2_resid: Optional[float]
dof: int
N: int
p_free: int
RSS: float
objective: float
success: bool
message: str
nfev: Optional[int] = None
nit: Optional[int] = None
warning: str = ""
raw_result: object = field(default=None, repr=False)
linear_result: object = field(default=None, repr=False)
@property
def sigma_resid(self) -> Optional[float]:
"""残差の標準偏差 (s) を計算して返します。
:returns: float: 残差の標準偏差。`sigma2_resid` がNoneの場合はNone。
"""
if self.sigma2_resid is None:
return None
return float(np.sqrt(max(self.sigma2_resid, 0.0)))
[ドキュメント]
def pack_values(values: Mapping[str, float], names: Sequence[str]) -> np.ndarray:
"""指定された名前リストに従い、パラメータ値をNumPy配列にパックします。
:param values: Mapping[str, float]: パラメータ名と値の辞書。
:param names: Sequence[str]: パックするパラメータ名の順序指定リスト。
:returns: np.ndarray: パックされたパラメータ値のNumPy配列。
"""
return np.array([float(values[n]) for n in names], dtype=float)
[ドキュメント]
def unpack_values(
p_free: ArrayLike,
base_values: Mapping[str, float],
free_names: Sequence[str],
) -> Dict[str, float]:
"""自由パラメータのベクトルを基となるパラメータ辞書に展開し、新しい辞書を返します。
`base_values` のコピーを作成し、`free_names` に対応する値を `p_free` から上書きします。
:param p_free: ArrayLike: 自由パラメータの値の配列またはリスト。
:param base_values: Mapping[str, float]: 基となる全パラメータの辞書。
:param free_names: Sequence[str]: 自由パラメータ名のリスト。`p_free` の順序と対応している必要があります。
:returns: Dict[str, float]: 自由パラメータが展開された新しい全パラメータの辞書。
"""
out = {k: float(v) for k, v in base_values.items()}
for name, value in zip(free_names, np.asarray(p_free, dtype=float).reshape(-1)):
out[name] = float(value)
return out
[ドキュメント]
def residuals_to_objective(
residuals: ArrayLike,
*,
penalty: float = 0.0,
) -> float:
"""残差ベクトルから目的関数値(RSS + penalty)を計算します。
残差に非有限な値(NaN, Inf)が含まれる場合、それらを非常に大きな値 (1.0e100)
として扱い、目的関数値を大きくすることで最適化から除外します。
:param residuals: ArrayLike: 残差の配列またはリスト。
:param penalty: float: 目的関数に加算するペナルティ値(デフォルト: 0.0)。
:returns: float: 計算された目的関数値。
"""
r = np.asarray(residuals, dtype=float).reshape(-1)
r = np.where(np.isfinite(r), r, 1.0e100)
return float(r @ r + penalty)
[ドキュメント]
def solve_linear_block(
y: ArrayLike,
params: Mapping[str, Mapping[str, object]],
p_current: Mapping[str, float],
design_matrix_func: Callable[[Mapping[str, float], Sequence[str]], np.ndarray],
*,
lin_names: Optional[Sequence[str]] = None,
weights: Optional[ArrayLike] = None,
) -> Tuple[Dict[str, float], object]:
"""現在の非線形パラメータのもとで、線形パラメータを線形最小二乗法で最適化します。
`design_matrix_func` は、現在のパラメータと線形パラメータ名のリストを受け取り、
デザイン行列 `X` を返すことを想定しています。これにより、観測データ `y` と
`X` を用いて線形パラメータを最小二乗で解きます。
`design_matrix_func` は以下の形を想定します。
X = design_matrix_func(p_current, lin_names)
たとえばアプリ側で `x` をクロージャとして持たせます。
def design_matrix_func(p, lin_names):
return make_X(x, p, lin_names)
:param y: ArrayLike: 観測データベクトル。
:param params: Mapping[str, Mapping[str, object]]: パラメータ定義情報(例: `tkparamio.read_param_csv` の戻り値形式)。
:param p_current: Mapping[str, float]: 現在の全パラメータ値の辞書(非線形パラメータを含む)。
:param design_matrix_func: Callable[[Mapping[str, float], Sequence[str]], np.ndarray]:
現在の非線形パラメータと線形パラメータ名リストに基づいてデザイン行列 `X` を生成する関数。
:param lin_names: Optional[Sequence[str]]: 線形パラメータ名のリスト。指定しない場合、`tkparamio` から自動的に取得されます。
:param weights: Optional[ArrayLike]: 各データ点に対する重みベクトル。`tklsq.tklsq.linear_lsq` に渡されます。
:returns: Tuple[Dict[str, float], object]:
- Dict[str, float]: 線形パラメータが更新された全パラメータの辞書。
- object: `tklsq.tklsq.linear_lsq` から返される線形最小二乗の生の結果オブジェクト。
:raises ValueError: `design_matrix_func` が不正な形状の配列を返した場合や、次元の不整合があった場合。
"""
if lin_names is None:
lin_names = tkparamio.linear_param_names(params)
lin_names = list(lin_names)
if not lin_names:
return {k: float(v) for k, v in p_current.items()}, None
y_arr = np.asarray(y, dtype=float).reshape(-1)
X = np.asarray(design_matrix_func(p_current, lin_names), dtype=float)
if X.ndim != 2:
raise ValueError("design_matrix_func must return a 2D array")
if X.shape[0] != y_arr.size:
raise ValueError("design matrix row count and y size mismatch")
if X.shape[1] != len(lin_names):
raise ValueError("design matrix column count must match lin_names")
lin_res = tkcore.linear_lsq(X, y_arr, weights=weights)
p_new = {k: float(v) for k, v in p_current.items()}
for name, beta in zip(lin_names, lin_res.beta):
p_new[name] = float(beta)
return p_new, lin_res
[ドキュメント]
def estimate_covariance_for_params(
residual_func: Callable[[Mapping[str, float]], ArrayLike],
params_fit: Mapping[str, float],
free_names: Sequence[str],
*,
rel_step: float = 1.0e-6,
abs_step: float = 1.0e-12,
) -> Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray], Dict[str, Optional[float]], Optional[float], int, str]:
"""任意の残差関数 `residual_func(params)` について、`free_names` で指定された自由パラメータの
共分散行列、標準誤差、および残差分散を推定します。
ヤコビアン行列は数値的に計算されます。
:param residual_func: Callable[[Mapping[str, float]], ArrayLike]:
パラメータ辞書を受け取り残差ベクトルを返す関数。
:param params_fit: Mapping[str, float]: 最適化後の全パラメータ値の辞書。
:param free_names: Sequence[str]: 共分散を推定する自由パラメータ名のリスト。
:param rel_step: float: 数値ヤコビアン計算のための相対ステップサイズ。
:param abs_step: float: 数値ヤコビアン計算のための絶対ステップサイズ。
:returns: Tuple[np.ndarray, Optional[np.ndarray], Optional[np.ndarray], Dict[str, Optional[float]], Optional[float], int, str]:
- np.ndarray: 数値的に計算されたヤコビアン行列。
- Optional[np.ndarray]: 自由パラメータの共分散行列。計算できなかった場合はNone。
- Optional[np.ndarray]: 自由パラメータの標準誤差ベクトル。計算できなかった場合はNone。
- Dict[str, Optional[float]]: 全パラメータの標準誤差の辞書。最適化されないパラメータや計算できなかった場合はNone。
- Optional[float]: 残差分散の推定値 (s^2)。計算できなかった場合はNone。
- int: 自由度 (N - p_free)。
- str: 共分散計算に関する警告メッセージ。
"""
free_names = list(free_names)
p_free = pack_values(params_fit, free_names)
def r_of_free(v):
p = unpack_values(v, params_fit, free_names)
r = np.asarray(residual_func(p), dtype=float).reshape(-1)
return np.where(np.isfinite(r), r, 1.0e100)
residuals = r_of_free(p_free)
J = numerical_jacobian(r_of_free, p_free, rel_step=rel_step, abs_step=abs_step)
dof = residuals.size - len(free_names)
cov, stderr_free, sigma2, warning = covariance_from_jacobian(residuals, J, dof=dof)
stderr: Dict[str, Optional[float]] = {name: None for name in params_fit.keys()}
if stderr_free is not None:
for name, se in zip(free_names, stderr_free):
stderr[name] = float(se)
return J, cov, stderr_free, stderr, sigma2, dof, warning
[ドキュメント]
def minimize_lsq(
residual_func: Callable[[Mapping[str, float]], ArrayLike],
params: Mapping[str, Mapping[str, object]],
*,
free_names: Optional[Sequence[str]] = None,
method: str = "Nelder-Mead",
use_penalty: bool = True,
maxiter: Optional[int] = None,
options: Optional[dict] = None,
callback: Optional[Callable[[int, Dict[str, float], float], None]] = None,
print_interval: int = 0,
rel_step: float = 1.0e-6,
abs_step: float = 1.0e-12,
) -> MinimizeLSQResult:
"""`scipy.optimize.minimize` を用いて、残差平方和(RSS)にペナルティ項を加えた目的関数を最小化します。
この関数は、与えられた `residual_func` を直接最適化します。
パラメータの境界制約はペナルティ項として目的関数に組み込まれることで処理されます。
`residual_func`:
パラメータ辞書を受け取り、残差ベクトルを返す関数。
例: `residual_func(params_dict) -> residual vector`
`params`:
`tkparamio.read_param_csv()` の戻り値形式で、全パラメータの定義情報。
ここから初期値、境界、`optid` などの情報が読み取られます。
:param residual_func: Callable[[Mapping[str, float]], ArrayLike]:
パラメータ辞書を受け取り、残差ベクトルを返す関数。
:param params: Mapping[str, Mapping[str, object]]:
パラメータ定義情報(`tkparamio.read_param_csv` の戻り値形式)。
:param free_names: Optional[Sequence[str]]:
最適化する自由パラメータ名のリスト。指定しない場合、`tkparamio.nonlinear_param_names(params)` から取得されます。
:param method: str:
`scipy.optimize.minimize` で使用する最適化メソッド。デフォルトは "Nelder-Mead"。
:param use_penalty: bool:
パラメータの境界制約に対するペナルティ項を目的関数に含めるかどうかのフラグ。デフォルトはTrue。
:param maxiter: Optional[int]:
最大イテレーション回数。`options` に `maxiter` が指定されていない場合に適用されます。
:param options: Optional[dict]:
`scipy.optimize.minimize` に渡す追加オプションの辞書。
:param callback: Optional[Callable[[int, Dict[str, float], float], None]]:
各イテレーション後に呼び出されるコールバック関数。
引数: (イテレーション回数, 現在の全パラメータ辞書, 現在の目的関数値)。
:param print_interval: int:
進捗状況をコンソールに出力するイテレーション間隔。0の場合は出力しません。
:param rel_step: float:
共分散計算のための数値ヤコビアン計算の相対ステップサイズ。
:param abs_step: float:
共分散計算のための数値ヤコビアン計算の絶対ステップサイズ。
:returns: MinimizeLSQResult: 最適化結果を格納したデータクラス。
:raises ImportError: `scipy` がインストールされていない場合。
"""
try:
from scipy.optimize import minimize
except Exception as exc: # pragma: no cover
raise ImportError("minimize_lsq requires scipy. Install with: pip install scipy") from exc
base_values = tkparamio.values_from_params(params)
if free_names is None:
free_names = tkparamio.nonlinear_param_names(params)
free_names = list(free_names)
x0 = pack_values(base_values, free_names)
counter = {"n": 0}
def residual_from_vector(v):
p = unpack_values(v, base_values, free_names)
r = np.asarray(residual_func(p), dtype=float).reshape(-1)
return np.where(np.isfinite(r), r, 1.0e100)
def objective(v):
p = unpack_values(v, base_values, free_names)
r = residual_from_vector(v)
penalty = tkparamio.bounds_penalty(params, p, names=free_names) if use_penalty else 0.0
return residuals_to_objective(r, penalty=penalty)
def scipy_callback(v):
counter["n"] += 1
obj = objective(v)
p = unpack_values(v, base_values, free_names)
if print_interval > 0 and counter["n"] % print_interval == 0:
print(f"[iter {counter['n']:5d}] objective={obj:.10g}, params={p}")
if callback is not None:
callback(counter["n"], p, obj)
if options is None:
options = {}
if maxiter is not None and "maxiter" not in options:
options = dict(options)
options["maxiter"] = maxiter
raw = minimize(
objective,
x0,
method=method,
callback=scipy_callback,
options=options,
)
p_fit = unpack_values(raw.x, base_values, free_names)
residuals = residual_from_vector(raw.x)
RSS = float(residuals @ residuals)
J, cov, stderr_free, stderr, sigma2, dof, warning = estimate_covariance_for_params(
residual_func,
p_fit,
free_names,
rel_step=rel_step,
abs_step=abs_step,
)
return MinimizeLSQResult(
params=p_fit,
params_free=pack_values(p_fit, free_names),
names=list(params.keys()),
free_names=free_names,
optimized_names=free_names,
linear_names=[],
residuals=residuals,
jacobian=J,
cov_free=cov,
stderr_free=stderr_free,
stderr=stderr,
sigma2_resid=sigma2,
dof=dof,
N=residuals.size,
p_free=len(free_names),
RSS=RSS,
objective=float(raw.fun),
success=bool(raw.success),
message=str(raw.message),
nfev=getattr(raw, "nfev", None),
nit=getattr(raw, "nit", None),
warning=warning,
raw_result=raw,
)
[ドキュメント]
def variable_projection_lsq(
y: ArrayLike,
params: Mapping[str, Mapping[str, object]],
model_func: Callable[[Mapping[str, float]], ArrayLike],
design_matrix_func: Callable[[Mapping[str, float], Sequence[str]], np.ndarray],
*,
nonlin_names: Optional[Sequence[str]] = None,
lin_names: Optional[Sequence[str]] = None,
method: str = "Nelder-Mead",
use_penalty: bool = True,
weights: Optional[ArrayLike] = None,
maxiter: Optional[int] = None,
options: Optional[dict] = None,
callback: Optional[Callable[[int, Dict[str, float], float], None]] = None,
print_interval: int = 0,
rel_step: float = 1.0e-6,
abs_step: float = 1.0e-12,
) -> MinimizeLSQResult:
"""変数投影法(Variable Projection)を用いて、非線形パラメータ探索中に線形パラメータを
線形最小二乗法で内部的に解くことで、目的関数を最小化します。
このアプローチにより、非線形最適化の対象となるパラメータ数を減らし、安定性と効率を向上させます。
線形パラメータは、現在の非線形パラメータの値に基づいて各イテレーションで再計算されます。
`model_func`:
観測データ `y` と比較するために、現在の全パラメータ(非線形および線形)を受け取り、
モデル予測値を返す関数。
例: `y_fit = model_func(params_dict)`
`design_matrix_func`:
現在の非線形パラメータと線形パラメータ名のリストを受け取り、デザイン行列 `X` を返す関数。
例: `X = design_matrix_func(params_dict, lin_names)`
どちらの関数も、例えば独立変数 `x` などをアプリ側のクロージャとして保持する想定です。
:param y: ArrayLike: 観測データベクトル。
:param params: Mapping[str, Mapping[str, object]]:
パラメータ定義情報(`tkparamio.read_param_csv` の戻り値形式)。
:param model_func: Callable[[Mapping[str, float]], ArrayLike]:
現在のパラメータ辞書に基づいてモデルの予測値ベクトルを返す関数。
:param design_matrix_func: Callable[[Mapping[str, float], Sequence[str]], np.ndarray]:
現在の非線形パラメータと線形パラメータ名リストに基づいてデザイン行列 `X` を生成する関数。
:param nonlin_names: Optional[Sequence[str]]:
最適化する非線形パラメータ名のリスト。指定しない場合、`tkparamio.nonlinear_param_names(params)` から取得されます。
:param lin_names: Optional[Sequence[str]]:
最適化する線形パラメータ名のリスト。指定しない場合、`tkparamio.linear_param_names(params)` から取得されます。
:param method: str:
`scipy.optimize.minimize` で使用する最適化メソッド。デフォルトは "Nelder-Mead"。
:param use_penalty: bool:
パラメータの境界制約に対するペナルティ項を目的関数に含めるかどうかのフラグ。デフォルトはTrue。
:param weights: Optional[ArrayLike]:
線形最小二乗ソルバーに渡される重みベクトル。
:param maxiter: Optional[int]:
最大イテレーション回数。`options` に `maxiter` が指定されていない場合に適用されます。
:param options: Optional[dict]:
`scipy.optimize.minimize` に渡す追加オプションの辞書。
:param callback: Optional[Callable[[int, Dict[str, float], float], None]]:
各イテレーション後に呼び出されるコールバック関数。
引数: (イテレーション回数, 現在の全パラメータ辞書, 現在の目的関数値)。
:param print_interval: int:
進捗状況をコンソールに出力するイテレーション間隔。0の場合は出力しません。
:param rel_step: float:
共分散計算のための数値ヤコビアン計算の相対ステップサイズ。
:param abs_step: float:
共分散計算のための数値ヤコビアン計算の絶対ステップサイズ。
:returns: MinimizeLSQResult: 最適化結果を格納したデータクラス。
:raises ImportError: `scipy` がインストールされていない場合。
:raises ValueError: `model_func` の出力サイズと `y` のサイズが一致しない場合。
"""
try:
from scipy.optimize import minimize
except Exception as exc: # pragma: no cover
raise ImportError("variable_projection_lsq requires scipy. Install with: pip install scipy") from exc
y_arr = np.asarray(y, dtype=float).reshape(-1)
base_values = tkparamio.values_from_params(params)
if nonlin_names is None:
nonlin_names = tkparamio.nonlinear_param_names(params)
if lin_names is None:
lin_names = tkparamio.linear_param_names(params)
nonlin_names = list(nonlin_names)
lin_names = list(lin_names)
x0 = pack_values(base_values, nonlin_names)
counter = {"n": 0}
last_linear_result = {"res": None}
def complete_params_from_nonlin(v):
p = unpack_values(v, base_values, nonlin_names)
p2, lin_res = solve_linear_block(
y_arr,
params,
p,
design_matrix_func,
lin_names=lin_names,
weights=weights,
)
last_linear_result["res"] = lin_res
return p2
def residual_from_params(p):
y_fit = np.asarray(model_func(p), dtype=float).reshape(-1)
if y_fit.size != y_arr.size:
raise ValueError("model_func output size and y size mismatch")
r = y_arr - y_fit
return np.where(np.isfinite(r), r, 1.0e100)
def objective(v):
p = complete_params_from_nonlin(v)
r = residual_from_params(p)
penalty_names = list(dict.fromkeys(nonlin_names + lin_names))
penalty = tkparamio.bounds_penalty(params, p, names=penalty_names) if use_penalty else 0.0
return residuals_to_objective(r, penalty=penalty)
def scipy_callback(v):
counter["n"] += 1
p = complete_params_from_nonlin(v)
obj = objective(v)
if print_interval > 0 and counter["n"] % print_interval == 0:
shown = {name: p[name] for name in list(dict.fromkeys(nonlin_names + lin_names))}
print(f"[iter {counter['n']:5d}] objective={obj:.10g}, params={shown}")
if callback is not None:
callback(counter["n"], p, obj)
if options is None:
options = {}
if maxiter is not None and "maxiter" not in options:
options = dict(options)
options["maxiter"] = maxiter
raw = minimize(
objective,
x0,
method=method,
callback=scipy_callback,
options=options,
)
p_fit = complete_params_from_nonlin(raw.x)
residuals = residual_from_params(p_fit)
RSS = float(residuals @ residuals)
free_names = list(dict.fromkeys(nonlin_names + lin_names))
def residual_for_cov(p):
return residual_from_params(p)
J, cov, stderr_free, stderr, sigma2, dof, warning = estimate_covariance_for_params(
residual_for_cov,
p_fit,
free_names,
rel_step=rel_step,
abs_step=abs_step,
)
return MinimizeLSQResult(
params=p_fit,
params_free=pack_values(p_fit, free_names),
names=list(params.keys()),
free_names=free_names,
optimized_names=nonlin_names,
linear_names=lin_names,
residuals=residuals,
jacobian=J,
cov_free=cov,
stderr_free=stderr_free,
stderr=stderr,
sigma2_resid=sigma2,
dof=dof,
N=residuals.size,
p_free=len(free_names),
RSS=RSS,
objective=float(raw.fun),
success=bool(raw.success),
message=str(raw.message),
nfev=getattr(raw, "nfev", None),
nit=getattr(raw, "nit", None),
warning=warning,
raw_result=raw,
linear_result=last_linear_result["res"],
)