regression.lsq_func_error_argparse のソースコード

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
lsq_func_error_argparse.py

汎用的な非線形最小二乗フィッティングプログラム。

このスクリプトは、与えられたモデル関数とデータに対して非線形最小二乗フィッティングを実行し、
パラメータの標準誤差と信頼区間を推定します。また、結果をExcelファイルに保存し、
フィッティングの進捗と最終結果をグラフで表示します。

特徴
--------
- モデル関数はコマンドラインから文字列として与えられます(例: `--func "p[0]*exp(-((x[0]-p[1])/p[2])**2)"`)。
- `--xlabels` を通じて複数の独立変数をサポートします。
- `scipy.optimize.minimize` を使用してパラメータを最適化できます。
- パラメータの標準誤差は数値ヤコビアンから推定されます。
- デルタ法によって信頼帯/不確実性帯が計算されます。
- 結果はExcelファイルに保存されます。

注意事項
-----
このスクリプトは、ローカルでの研究利用を意図しています。
式評価には `eval()` を使用しますが、グローバル変数は制限されています。
信頼できない式は渡さないでください。

:doc:`lsq_func_error_argparse_usage`
"""

import sys
import os
import argparse
import types
import traceback
from typing import List, Tuple, Dict, Any, Optional

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from scipy.stats import t as student_t

# ----------------------------------------------------------------------
# Optional tkProg/tklib helpers
# ----------------------------------------------------------------------
try:
    from tklib.tkapplication import tkApplication
except Exception:
    tkApplication = None

try:
    from tklib.tkvariousdata import tkVariousData
except Exception:
    tkVariousData = None


# ----------------------------------------------------------------------
# Small utilities
# ----------------------------------------------------------------------
[ドキュメント] def pause_if_needed(pause: int) -> None: """ 必要に応じてプログラムの実行を一時停止します。 :param pause: int 0以外の場合に一時停止するフラグ。 :returns: None """ if int(pause): input("\nPress ENTER to terminate >> ")
[ドキュメント] def terminate(message: str, pause: int = 1, code: int = 1) -> None: """ エラーメッセージを表示し、プログラムを終了します。 指定されたメッセージを表示し、必要に応じて一時停止した後、 指定された終了コードでプロセスを終了します。 :param message: str 表示するエラーメッセージ。 :param pause: int (オプション) 終了前に一時停止するかどうかのフラグ (デフォルトは1)。 :param code: int (オプション) プロセス終了コード (デフォルトは1)。 :returns: None (実際にはsys.exitにより終了) """ if message: print(message) pause_if_needed(pause) sys.exit(code)
[ドキュメント] def replace_path(path: str, suffix: str) -> str: """ 指定されたパスのファイル名部分にサフィックスを追加した新しいパスを返します。 パスからディレクトリ名とファイル名本体を抽出し、 ファイル名本体にサフィックスを追加して結合します。 :param path: str 元のファイルパス。 :param suffix: str ファイル名に追加するサフィックス。 :returns: str サフィックスが追加された新しいファイルパス。 """ d = os.path.dirname(path) b = os.path.splitext(os.path.basename(path))[0] return os.path.join(d, b + suffix)
[ドキュメント] def parse_float_list(text: str, name: str) -> List[float]: """ カンマ区切りの文字列を浮動小数点数のリストにパースします。 空の文字列やスペースのみの要素は無視されます。 パースに失敗した場合、ValueErrorを発生させます。 :param text: str パースするカンマ区切りの文字列。 :param name: str エラーメッセージに含める変数名。 :returns: List[float] パースされた浮動小数点数のリスト。 :raises ValueError: テキストを浮動小数点数のリストとしてパースできなかった場合。 """ try: return [float(s.strip()) for s in text.split(',') if s.strip() != ''] except Exception as exc: raise ValueError(f"Could not parse {name} as comma-separated floats: {text}") from exc
[ドキュメント] def parse_str_list(text: str) -> List[str]: """ カンマ区切りの文字列を文字列のリストにパースします。 各要素の前後の空白は取り除かれ、空の文字列やスペースのみの要素は無視されます。 :param text: str パースするカンマ区切りの文字列。 :returns: List[str] パースされた文字列のリスト。 """ return [s.strip() for s in text.split(',') if s.strip() != '']
[ドキュメント] def parse_ranges(text: str, n: int, default_min: float = -1.0e100, default_max: float = 1.0e100) -> List[Tuple[float, float]]: """ 'xmin:xmax'形式の範囲指定文字列をパースし、浮動小数点数のタプルのリストを返します。 `text` が空または `'*'` の場合、全ての変数に対してデフォルトの最小値・最大値を使用します。 単一の範囲が指定され、`n` が1より大きい場合、その範囲が `n` 回繰り返されます。 各範囲は `xmin:xmax` 形式である必要があります。`*` または空はデフォルト値として扱われます。 :param text: str パースする範囲指定文字列(例: '-1:1, 0:100')。 :param n: int 期待される範囲の数(独立変数の数)。 :param default_min: float (オプション) デフォルトの最小値(デフォルトは非常に小さい値)。 :param default_max: float (オプション) デフォルトの最大値(デフォルトは非常に大きい値)。 :returns: List[Tuple[float, float]] 各変数に対する (最小値, 最大値) のタプルのリスト。 :raises ValueError: 範囲の数がnと一致しない、または形式が不正な場合。 """ if text is None or str(text).strip() in ('', '*'): return [(default_min, default_max) for _ in range(n)] items = parse_str_list(text) if len(items) == 1 and n > 1: items = items * n if len(items) != n: raise ValueError(f"Number of ranges must be 1 or nvar={n}: {text}") out = [] for item in items: if ':' not in item: raise ValueError(f"Range must be xmin:xmax format: {item}") a, b = item.split(':', 1) xmin = default_min if a.strip() in ('', '*') else float(a) xmax = default_max if b.strip() in ('', '*') else float(b) out.append((xmin, xmax)) return out
[ドキュメント] def parse_bounds(text: str, n: int) -> Optional[List[Tuple[Optional[float], Optional[float]]]]: """ 'lower:upper'形式の最適化境界指定文字列をパースし、浮動小数点数またはNoneのタプルのリストを返します。 `text` が空、`'*'`、`'none'`、または `'None'` の場合、境界なしと見なされます(`None` を返します)。 単一の境界が指定され、`n` が1より大きい場合、その境界が `n` 回繰り返されます。 各境界は `lower:upper` 形式である必要があります。`*` または空は `None` として扱われます。 :param text: str パースする境界指定文字列(例: '0:*, *:10, -5:5')。 :param n: int 期待される境界の数(パラメータの数)。 :returns: Optional[List[Tuple[Optional[float], Optional[float]]]] 各パラメータに対する (下限, 上限) のタプルのリスト、または `None`。 :raises ValueError: 境界の数がnと一致しない、または形式が不正な場合。 """ if text is None or str(text).strip() in ('', '*', 'none', 'None'): return None items = parse_str_list(text) if len(items) == 1 and n > 1: items = items * n if len(items) != n: raise ValueError(f"Number of bounds must be 1 or nparam={n}: {text}") out = [] for item in items: if ':' not in item: raise ValueError(f"Bound must be lower:upper format: {item}") a, b = item.split(':', 1) lo = None if a.strip() in ('', '*') else float(a) hi = None if b.strip() in ('', '*') else float(b) out.append((lo, hi)) return out
[ドキュメント] def parse_fixed(text: str, n: int) -> np.ndarray: """ 固定するパラメータのインデックス指定文字列をパースし、ブール値のマスク配列を返します。 `text` が空、`'*'`、`'none'`、または `'None'` の場合、全てのパラメータは固定されないと見なされます。 カンマ区切りのインデックスリストとして指定します(例: '0,2')。 指定されたインデックスが範囲外の場合、ValueErrorを発生させます。 :param text: str 固定するパラメータのインデックス文字列。 :param n: int パラメータの総数。 :returns: numpy.ndarray n要素を持つブール配列。Trueは対応するパラメータが固定されていることを示します。 :raises ValueError: 固定パラメータのインデックスが範囲外の場合。 """ fixed = np.zeros(n, dtype=bool) if text is None or str(text).strip() in ('', '*', 'none', 'None'): return fixed for s in parse_str_list(text): idx = int(s) if idx < 0 or idx >= n: raise ValueError(f"Fixed parameter index out of range: {idx}") fixed[idx] = True return fixed
# ---------------------------------------------------------------------- # Argument parser # ----------------------------------------------------------------------
[ドキュメント] def initialize() -> types.SimpleNamespace: """ コマンドライン引数をパースし、設定オブジェクトを初期化します。 `argparse` を使用して引数を定義・パースします。 パラメータの初期値、独立変数のラベル、従属変数のラベル、フィッティング範囲、 最適化メソッド、最大反復回数、許容誤差、境界、固定パラメータ、数値微分のステップサイズ、 計算グリッド設定、出力パス、信頼区間レベル、プロットオプションなどを設定します。 パース後、浮動小数点数リストや文字列リスト、マスク配列などに変換し、 設定オブジェクト (`types.SimpleNamespace`) として返します。 全てのパラメータが固定されている場合はエラーで終了します。 出力ファイルパスを生成します。 :returns: types.SimpleNamespace 設定パラメータを含むオブジェクト。 """ parser = argparse.ArgumentParser( description='Generic nonlinear least-squares fitting with parameter errors and confidence bands' ) # Basic IO and model parser.add_argument('--mode', type=str, default='fit', choices=['fit', 'sim'], help='fit or sim') parser.add_argument('--infile', type=str, default='peak.xlsx', help='Input Excel/CSV/text file') parser.add_argument('--func', type=str, default='p[0]*exp(-((x[0]-p[1])/p[2])**2)', help='Model function. Use x[0], x[1], ... and p[0], p[1], ...') parser.add_argument('--p0', type=str, default='1.3,0.6,0.1', help='Initial parameters, comma separated') parser.add_argument('--xlabels', type=str, default='0', help='Independent variable labels or column indices, comma separated') parser.add_argument('--ylabel', type=str, default='1', help='Dependent variable label or column index') parser.add_argument('--fit_range', type=str, default='*', help='Fit range for each x as xmin:xmax, comma separated. Example: -1:1,0:10') # Optimizer parser.add_argument('--method', type=str, default='BFGS', help='scipy.optimize.minimize method') parser.add_argument('--maxiter', type=int, default=1000, help='Maximum optimizer iterations') parser.add_argument('--tol', type=float, default=1.0e-8, help='Optimizer tolerance') parser.add_argument('--bounds', type=str, default='*', help='Bounds for parameters as lower:upper, comma separated. Use with L-BFGS-B/SLSQP/etc.') parser.add_argument('--fix', type=str, default='', help='Fixed parameter indices, comma separated. Example: 0,2') # Numerical derivatives parser.add_argument('--jac_absstep', type=float, default=1.0e-8, help='Absolute step for numerical derivative') parser.add_argument('--jac_relstep', type=float, default=1.0e-5, help='Relative step for numerical derivative') parser.add_argument('--use_jac', type=int, default=1, choices=[0, 1], help='Use numerical Jacobian for optimizer, 0/1') # Calculation grid and output parser.add_argument('--xcalmin', type=str, default='*', help='Calculation x min for 1D plot/grid') parser.add_argument('--xcalmax', type=str, default='*', help='Calculation x max for 1D plot/grid') parser.add_argument('--ncal', type=int, default=201, help='Number of calculation grid points for 1D plot') parser.add_argument('--out_prefix', type=str, default='', help='Output prefix. Default: input file body') parser.add_argument('--xlsm_template', type=str, default='', help='Reserved for compatibility') # Uncertainty and plot options, Launcher friendly 0/1 flags parser.add_argument('--confidence', type=float, default=0.682689492, help='Confidence level. 0.6827 is about 1 sigma') parser.add_argument('--plot', type=int, default=1, choices=[0, 1], help='Show plot, 0/1') parser.add_argument('--nplot_interval', type=int, default=10, help='Update live fitting plot every N iterations. 0 disables live update') parser.add_argument('--plot_pause', type=float, default=0.01, help='Pause time for live plot update') parser.add_argument('--plot_ci', type=int, default=1, choices=[0, 1], help='Plot uncertainty bands, 0/1') parser.add_argument('--plot_sigma_param', type=int, default=1, choices=[0, 1], help='Plot parameter uncertainty band, 0/1') parser.add_argument('--plot_sigma_pred', type=int, default=1, choices=[0, 1], help='Plot prediction uncertainty band, 0/1') parser.add_argument('--plot_sigma_combined', type=int, default=0, choices=[0, 1], help='Plot param + measured-y scatter band, 0/1') parser.add_argument('--figsize', type=float, nargs=2, default=[10.0, 5.0], help='Figure size') parser.add_argument('--fontsize', type=int, default=14, help='Font size') parser.add_argument('--fontsize_legend', type=int, default=10, help='Legend font size') parser.add_argument('--pause', type=int, default=1, choices=[0, 1], help='Pause before termination, 0/1') cfg = parser.parse_args() cfg.p0 = np.asarray(parse_float_list(cfg.p0, 'p0'), dtype=float) cfg.xlabels_list = parse_str_list(cfg.xlabels) cfg.nparam = len(cfg.p0) cfg.fixed_mask = parse_fixed(cfg.fix, cfg.nparam) cfg.active_mask = ~cfg.fixed_mask cfg.bounds_full = parse_bounds(cfg.bounds, cfg.nparam) if np.sum(cfg.active_mask) == 0: terminate('Error: all parameters are fixed.', cfg.pause) if cfg.out_prefix.strip() == '': cfg.out_prefix = os.path.splitext(cfg.infile)[0] cfg.output_fitting_path = cfg.out_prefix + '-fit.xlsx' cfg.output_parameter_path = cfg.out_prefix + '-parameters.xlsx' cfg.output_convergence_path = cfg.out_prefix + '-convergence.xlsx' return cfg
# ---------------------------------------------------------------------- # Data loading # ---------------------------------------------------------------------- def _column_by_label_or_index(df: pd.DataFrame, key: str) -> Tuple[str, np.ndarray]: """ DataFrameから指定されたラベルまたはインデックスに対応する列を抽出し、NumPy配列として返します。 `key` がDataFrameの列名として存在する場合、その列を使用します。 `key` が整数として解釈できる場合、そのインデックスの列を使用します。 抽出された列は浮動小数点数型に変換されます。 列が見つからない、またはインデックスが範囲外の場合、エラーが発生します。 :param df: pandas.DataFrame 処理対象のDataFrame。 :param key: str 列のラベルまたは整数インデックス。 :returns: Tuple[str, numpy.ndarray] 抽出された列のラベルとNumPy配列のタプル。 :raises KeyError: 指定されたキーが列ラベルとして見つからず、かつ整数としてパースできない場合。 :raises IndexError: 指定されたインデックスがDataFrameの列の範囲外の場合。 """ key = str(key).strip() if key in df.columns: label = key else: try: idx = int(key) except Exception as exc: raise KeyError(f"Column [{key}] was not found as label or integer index") from exc if idx < 0 or idx >= len(df.columns): raise IndexError(f"Column index out of range: {idx}") label = df.columns[idx] return str(label), pd.to_numeric(df[label], errors='coerce').to_numpy(dtype=float)
[ドキュメント] def read_table(path: str) -> pd.DataFrame: """ 指定されたパスのファイルからデータを読み込み、DataFrameとして返します。 ファイルの拡張子に基づいて、Excelファイル (`.xlsx`, `.xlsm`, `.xls`) または CSVファイル (`.csv`) として読み込みを試みます。 上記のいずれにも該当しない場合、空白またはタブ区切りのテキストファイルとして 読み込みを試みます。 ファイルが存在しない場合はエラーで終了します。 :param path: str 読み込むファイルのパス。 :returns: pandas.DataFrame 読み込まれたデータを含むDataFrame。 """ if not os.path.exists(path): terminate(f"Error: input file does not exist: {path}", 1) ext = os.path.splitext(path)[1].lower() if ext in ['.xlsx', '.xlsm', '.xls']: return pd.read_excel(path, engine='openpyxl') if ext in ['.csv']: return pd.read_csv(path) # Whitespace or tab separated text fallback try: return pd.read_csv(path, sep=None, engine='python') except Exception: return pd.read_csv(path, delim_whitespace=True)
[ドキュメント] def load_data(cfg: types.SimpleNamespace) -> Dict[str, Any]: """ 設定オブジェクトに基づいて入力データを読み込み、処理します。 `cfg.infile` からデータを読み込みます。 `cfg.xlabels_list` と `cfg.ylabel` に基づいて独立変数 `X` と従属変数 `y` を抽出します。 `cfg.fit_range` に基づいて、有効なフィッティングデータポイントをマスクします。 有効なデータポイントがない場合、エラーで終了します。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :returns: Dict[str, Any] 処理されたデータを含む辞書。キーは 'df', 'xlabels', 'ylabel', 'x', 'y', 'ranges'。 """ df = read_table(cfg.infile) xlabels = [] xlist = [] for key in cfg.xlabels_list: label, arr = _column_by_label_or_index(df, key) xlabels.append(label) xlist.append(arr) ylabel, y = _column_by_label_or_index(df, cfg.ylabel) Xraw = np.vstack(xlist) ranges = parse_ranges(cfg.fit_range, len(xlabels)) mask = np.isfinite(y) for j in range(len(xlabels)): xmin, xmax = ranges[j] mask &= np.isfinite(Xraw[j]) mask &= (xmin <= Xraw[j]) & (Xraw[j] <= xmax) X = Xraw[:, mask] yfit = y[mask] if len(yfit) == 0: terminate('Error: no valid data points in the specified fit range.', cfg.pause) return { 'df': df, 'xlabels': xlabels, 'ylabel': ylabel, 'x': X, 'y': yfit, 'ranges': ranges, }
# ---------------------------------------------------------------------- # Safe-ish expression evaluation # ----------------------------------------------------------------------
[ドキュメント] def build_eval_env() -> Dict[str, Any]: """ モデル関数を安全に評価するための環境(名前空間)を構築します。 `eval()` 関数で使用されるグローバル名前空間を制限し、 許可されたモジュールや関数(`numpy` の一部関数、数学関数など)のみを含めます。 これにより、モデル関数文字列の評価におけるセキュリティリスクを低減します。 :returns: Dict[str, Any] 評価環境として使用される辞書。 """ allowed = { '__builtins__': {}, 'np': np, 'numpy': np, 'pi': np.pi, 'e': np.e, 'exp': np.exp, 'log': np.log, 'log10': np.log10, 'sqrt': np.sqrt, 'abs': np.abs, 'sin': np.sin, 'cos': np.cos, 'tan': np.tan, 'arcsin': np.arcsin, 'arccos': np.arccos, 'arctan': np.arctan, 'sinh': np.sinh, 'cosh': np.cosh, 'tanh': np.tanh, 'minimum': np.minimum, 'maximum': np.maximum, 'where': np.where, 'clip': np.clip, } return allowed
[ドキュメント] def cal_y_one(x_one: np.ndarray, p: np.ndarray, cfg: types.SimpleNamespace, env: Dict[str, Any]) -> float: """ 単一の独立変数ベクトル `x_one` とパラメータ `p` を使用してモデル関数を評価します。 `cfg.func` で指定されたモデル関数文字列を `eval()` を用いて評価します。 評価環境 `env` と、独立変数 `x` およびパラメータ `p` のローカル変数として提供されます。 :param x_one: numpy.ndarray 単一のデータ点における独立変数のベクトル。 :param p: numpy.ndarray パラメータのベクトル。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :param env: Dict[str, Any] 評価環境(グローバル名前空間)。 :returns: float モデル関数の計算結果。 """ val = eval(cfg.func, env, {'x': x_one, 'p': p}) return float(val)
[ドキュメント] def cal_y_array(x: np.ndarray, p: np.ndarray, cfg: types.SimpleNamespace, env: Dict[str, Any]) -> np.ndarray: """ 複数の独立変数データポイント `x` とパラメータ `p` を使用してモデル関数を評価します。 `x` は `(nvar, ndata)` の形状を持つ配列で、各列が1つのデータポイントに対応します。 `cal_y_one` を各データポイントに対して繰り返し呼び出し、結果をNumPy配列として返します。 :param x: numpy.ndarray 独立変数の配列(形状: (nvar, ndata))。 :param p: numpy.ndarray パラメータのベクトル。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :param env: Dict[str, Any] 評価環境(グローバル名前空間)。 :returns: numpy.ndarray モデル関数の計算結果の配列(形状: (ndata,))。 """ y = np.empty(x.shape[1], dtype=float) for i in range(x.shape[1]): y[i] = cal_y_one(x[:, i], p, cfg, env) return y
# ---------------------------------------------------------------------- # Parameter packing/unpacking for fixed parameters # ----------------------------------------------------------------------
[ドキュメント] def pack_active(p_full: np.ndarray, active_mask: np.ndarray) -> np.ndarray: """ 全パラメータベクトルからアクティブな(固定されていない)パラメータを抽出します。 `active_mask` が `True` の位置にある要素のみを新しい配列として返します。 :param p_full: numpy.ndarray 全パラメータのベクトル。 :param active_mask: numpy.ndarray アクティブなパラメータを示すブールマスク。 :returns: numpy.ndarray アクティブなパラメータのみを含むベクトル。 """ return np.asarray(p_full[active_mask], dtype=float)
[ドキュメント] def unpack_active(q: np.ndarray, p_template: np.ndarray, active_mask: np.ndarray) -> np.ndarray: """ アクティブなパラメータ `q` を、固定されたパラメータを含む完全なパラメータベクトル `p` に展開します。 `p_template` をコピーし、`active_mask` が `True` の位置に `q` の要素を挿入します。 :param q: numpy.ndarray アクティブなパラメータのみを含むベクトル。 :param p_template: numpy.ndarray 固定されたパラメータを含む完全なパラメータのテンプレートベクトル。 :param active_mask: numpy.ndarray アクティブなパラメータを示すブールマスク。 :returns: numpy.ndarray qの値が埋め込まれた完全なパラメータベクトル。 """ p = np.asarray(p_template, dtype=float).copy() p[active_mask] = q return p
[ドキュメント] def active_bounds(bounds_full: Optional[List[Tuple[Optional[float], Optional[float]]]], active_mask: np.ndarray) -> Optional[List[Tuple[Optional[float], Optional[float]]]]: """ 全パラメータの境界リストからアクティブなパラメータの境界のみを抽出します。 `bounds_full` が `None` の場合は `None` を返します。 それ以外の場合、`active_mask` に対応する境界のみを抽出してリストとして返します。 :param bounds_full: Optional[List[Tuple[Optional[float], Optional[float]]]] 全パラメータの境界のリスト。 :param active_mask: numpy.ndarray アクティブなパラメータを示すブールマスク。 :returns: Optional[List[Tuple[Optional[float], Optional[float]]]] アクティブなパラメータの境界のリスト、または `None`。 """ if bounds_full is None: return None return [b for b, active in zip(bounds_full, active_mask) if active]
# ---------------------------------------------------------------------- # Objective and derivatives # ----------------------------------------------------------------------
[ドキュメント] def residual_vector(p: np.ndarray, x: np.ndarray, y: np.ndarray, cfg: types.SimpleNamespace, env: Dict[str, Any]) -> np.ndarray: """ モデルの残差ベクトル(観測値 - モデル予測値)を計算します。 `cal_y_array` を呼び出してモデル予測値を計算し、それを観測値 `y` から減算します。 :param p: numpy.ndarray パラメータのベクトル。 :param x: numpy.ndarray 独立変数の配列。 :param y: numpy.ndarray 従属変数の観測値。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :param env: Dict[str, Any] 評価環境。 :returns: numpy.ndarray 残差のベクトル。 """ return y - cal_y_array(x, p, cfg, env)
[ドキュメント] def objective_q(q: np.ndarray, p_template: np.ndarray, x: np.ndarray, y: np.ndarray, cfg: types.SimpleNamespace, env: Dict[str, Any], history: Optional[Dict[str, list]] = None) -> float: """ 最適化の目的関数(二乗平均誤差、MSE)を計算します。 アクティブなパラメータ `q` を完全なパラメータベクトル `p` に展開します。 モデル残差を計算し、二乗和(RSS)を求め、データ数で割ってMSEを計算します。 `history` が指定されている場合、現在のパラメータとMSEを記録します。 MSEが有限でない場合、非常に大きな値を返します。 :param q: numpy.ndarray アクティブなパラメータのベクトル。 :param p_template: numpy.ndarray 固定されたパラメータを含む完全なパラメータのテンプレート。 :param x: numpy.ndarray 独立変数の配列。 :param y: numpy.ndarray 従属変数の観測値。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :param env: Dict[str, Any] 評価環境。 :param history: Optional[Dict[str, list]] (オプション) 最適化履歴を記録するための辞書。 :returns: float 二乗平均誤差 (MSE)。 """ p = unpack_active(q, p_template, cfg.active_mask) r = residual_vector(p, x, y, cfg, env) rss = float(r @ r) mse = rss / len(y) if history is not None: history['last_p'] = p.copy() history['last_mse'] = mse if not np.isfinite(mse): return 1.0e300 return mse
[ドキュメント] def step_for_param(value: float, cfg: types.SimpleNamespace) -> float: """ 数値微分に使用するパラメータのステップサイズを計算します。 `cfg.jac_absstep` (絶対ステップ) と `cfg.jac_relstep` (相対ステップ) に基づいて ステップサイズを決定します。相対ステップは、パラメータの絶対値または1.0のうち 大きい方に乗算されます。 :param value: float ステップサイズを計算する対象のパラメータ値。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :returns: float 計算されたステップサイズ。 """ return cfg.jac_absstep + cfg.jac_relstep * max(abs(float(value)), 1.0)
[ドキュメント] def gradient_q(q: np.ndarray, p_template: np.ndarray, x: np.ndarray, y: np.ndarray, cfg: types.SimpleNamespace, env: Dict[str, Any]) -> np.ndarray: """ アクティブなパラメータに関する目的関数 `objective_q` の勾配を数値的に計算します。 各アクティブパラメータについて、中心差分法を用いて勾配の要素を計算します。 :param q: numpy.ndarray アクティブなパラメータのベクトル。 :param p_template: numpy.ndarray 固定されたパラメータを含む完全なパラメータのテンプレート。 :param x: numpy.ndarray 独立変数の配列。 :param y: numpy.ndarray 従属変数の観測値。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :param env: Dict[str, Any] 評価環境。 :returns: numpy.ndarray 目的関数の勾配ベクトル。 """ g = np.empty_like(q, dtype=float) for k in range(len(q)): h = step_for_param(q[k], cfg) qm = q.copy() qp = q.copy() qm[k] -= h qp[k] += h fm = objective_q(qm, p_template, x, y, cfg, env) fp = objective_q(qp, p_template, x, y, cfg, env) g[k] = (fp - fm) / (2.0 * h) return g
[ドキュメント] def numerical_model_jacobian(p: np.ndarray, x: np.ndarray, cfg: types.SimpleNamespace, env: Dict[str, Any]) -> np.ndarray: """ モデル関数 `f(x, p)` の、アクティブなパラメータに関するヤコビアン行列を数値的に計算します。 ヤコビアン `J` は `(ndata, n_active)` の形状を持ちます。 各アクティブパラメータ `p[ip]` について、中心差分法を用いて `f` の偏導関数を計算します。 :param p: numpy.ndarray 完全なパラメータのベクトル(固定パラメータを含む)。 :param x: numpy.ndarray 独立変数の配列(形状: (nvar, ndata))。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :param env: Dict[str, Any] 評価環境。 :returns: numpy.ndarray モデルのヤコビアン行列 J。 """ active_indices = np.where(cfg.active_mask)[0] ndata = x.shape[1] J = np.empty((ndata, len(active_indices)), dtype=float) for col, ip in enumerate(active_indices): h = step_for_param(p[ip], cfg) pm = p.copy() pp = p.copy() pm[ip] -= h pp[ip] += h ym = cal_y_array(x, pm, cfg, env) yp = cal_y_array(x, pp, cfg, env) J[:, col] = (yp - ym) / (2.0 * h) return J
[ドキュメント] def covariance_from_jacobian(J: np.ndarray, residuals: np.ndarray) -> Dict[str, Any]: """ 非線形最小二乗法の線形化に基づいて、パラメータの共分散行列を推定します。 共分散行列 `cov` は `sigma^2 * inv(J^T J)` として計算されます。 `sigma^2` (残差分散) は `RSS / dof` (自由度) から求められます。 自由度が0以下の場合、または `J^T J` の逆行列が計算できない場合、共分散は推定されません。 診断メッセージや有効性フラグを含む辞書を返します。 :param J: numpy.ndarray モデルのヤコビアン行列(形状: (ndata, n_active))。 :param residuals: numpy.ndarray 残差ベクトル(観測値 - モデル予測値)。 :returns: Dict[str, Any] 推定された共分散情報を含む辞書。 """ ndata, npar = J.shape rss = float(residuals @ residuals) dof = ndata - npar result = { 'rss': rss, 'ndata': ndata, 'npar_active': npar, 'dof': dof, 'sigma2_resid': np.nan, 'sigma_resid': np.nan, 'cov_active': None, 'std_active': None, 'corr_active': None, 'valid': False, 'message': '', } if dof <= 0: result['message'] = ( f'WARNING: ndata={ndata} <= n_active_parameters={npar}. ' 'Parameter errors and confidence bands are not estimated.' ) return result JTJ = J.T @ J try: JTJ_inv = np.linalg.pinv(JTJ) except Exception as exc: result['message'] = f'WARNING: could not invert J^T J: {exc}' return result sigma2 = rss / dof cov = sigma2 * JTJ_inv diag = np.diag(cov) if np.any(diag < 0): result['message'] = 'WARNING: covariance has negative diagonal elements; errors are not reliable.' return result std = np.sqrt(diag) denom = np.outer(std, std) with np.errstate(divide='ignore', invalid='ignore'): corr = cov / denom result.update({ 'sigma2_resid': sigma2, 'sigma_resid': np.sqrt(sigma2), 'cov_active': cov, 'std_active': std, 'corr_active': corr, 'valid': True, 'message': 'OK', }) return result
[ドキュメント] def expand_active_vector(v_active: Optional[np.ndarray], cfg: types.SimpleNamespace, fill: float = np.nan) -> np.ndarray: """ アクティブなパラメータに対応するベクトルを、固定されたパラメータを含む完全なパラメータ数に拡張します。 `v_active` が `None` の場合、完全なベクトルは `fill` 値で埋められます。 `v_active` が与えられた場合、`cfg.active_mask` に従って `v` の適切な位置に値を配置し、 `cfg.fixed_mask` の位置は `0.0` で埋めます。 :param v_active: Optional[numpy.ndarray] アクティブなパラメータのベクトル。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :param fill: float (オプション) アクティブでないパラメータの埋め込み値(デフォルトは NaN)。 :returns: numpy.ndarray 完全なパラメータ数に拡張されたベクトル。 """ v = np.full(cfg.nparam, fill, dtype=float) if v_active is not None: v[cfg.active_mask] = v_active v[cfg.fixed_mask] = 0.0 return v
[ドキュメント] def expand_active_matrix(m_active: Optional[np.ndarray], cfg: types.SimpleNamespace, fill: float = np.nan) -> np.ndarray: """ アクティブなパラメータに対応する行列を、固定されたパラメータを含む完全なパラメータ数に拡張します。 `m_active` が `None` の場合、完全な行列は `fill` 値で埋められます。 `m_active` が与えられた場合、`cfg.active_mask` に従って `m` の適切な位置に値を配置し、 固定されたパラメータに対応する対角要素は `0.0` で埋めます。 :param m_active: Optional[numpy.ndarray] アクティブなパラメータの行列(共分散行列など)。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :param fill: float (オプション) アクティブでないパラメータの埋め込み値(デフォルトは NaN)。 :returns: numpy.ndarray 完全なパラメータ数に拡張された行列。 """ m = np.full((cfg.nparam, cfg.nparam), fill, dtype=float) if m_active is not None: idx = np.where(cfg.active_mask)[0] for ia, i in enumerate(idx): for ja, j in enumerate(idx): m[i, j] = m_active[ia, ja] for i in np.where(cfg.fixed_mask)[0]: m[i, i] = 0.0 return m
# ---------------------------------------------------------------------- # Bands # ----------------------------------------------------------------------
[ドキュメント] def compute_uncertainty_bands(xcal: np.ndarray, p: np.ndarray, cfg: types.SimpleNamespace, env: Dict[str, Any], cov_active: np.ndarray, sigma2_resid: float, confidence: float) -> Dict[str, np.ndarray]: """ デルタ法を用いて、モデル予測値の不確実性バンド(信頼区間)を計算します。 計算グリッド `xcal` におけるモデルの平均予測値 `y_mean` を計算します。 `xcal` におけるモデルのヤコビアン `Jcal` を数値的に計算します。 パラメータの不確実性に起因する分散 `var_param` と、モデル予測の分散 `var_pred` を計算します。 Studentのt分布を使用して、指定された `confidence` レベルに対応する乗数 (`tval`) を求めます。 パラメータ信頼区間 (`ci_param`) と予測信頼区間 (`ci_pred`) を計算します。 :param xcal: numpy.ndarray 計算グリッドの独立変数。 :param p: numpy.ndarray 最適化されたパラメータのベクトル。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :param env: Dict[str, Any] 評価環境。 :param cov_active: numpy.ndarray アクティブなパラメータの共分散行列。 :param sigma2_resid: float 残差の分散(sigma^2)。 :param confidence: float 信頼水準 (例: 0.6827)。 :returns: Dict[str, numpy.ndarray] 不確実性バンド情報を含む辞書。 """ y_mean = cal_y_array(xcal, p, cfg, env) Jcal = numerical_model_jacobian(p, xcal, cfg, env) var_param = np.sum((Jcal @ cov_active) * Jcal, axis=1) var_param = np.maximum(var_param, 0.0) sigma_param = np.sqrt(var_param) sigma_pred = np.sqrt(var_param + sigma2_resid) dof = max(1, xcal.shape[1] - Jcal.shape[1]) # central two-sided interval multiplier tval = student_t.ppf(0.5 + confidence / 2.0, dof) return { 'y_mean': y_mean, 'sigma_param': sigma_param, 'sigma_pred': sigma_pred, 'ci_param': tval * sigma_param, 'ci_pred': tval * sigma_pred, 'tval': np.full_like(y_mean, tval, dtype=float), }
[ドキュメント] def make_calculation_grid(x: np.ndarray, cfg: types.SimpleNamespace) -> np.ndarray: """ プロットや計算のために、モデル評価用の独立変数グリッドを生成します。 独立変数が1次元の場合 (`nvar == 1`)、`cfg.xcalmin`、`cfg.xcalmax`、`cfg.ncal` に基づいて 等間隔のグリッドを生成します。`'*'` や空の指定はデータ範囲全体を意味します。 独立変数が多次元の場合 (`nvar > 1`)、元のデータ `x` をそのまま計算グリッドとして使用します。 :param x: numpy.ndarray 元のデータセットの独立変数。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :returns: numpy.ndarray 計算用の独立変数グリッド。 """ nvar, ndata = x.shape if nvar == 1: if cfg.xcalmin in ('*', '', None): xmin = float(np.min(x[0])) else: xmin = float(cfg.xcalmin) if cfg.xcalmax in ('*', '', None): xmax = float(np.max(x[0])) else: xmax = float(cfg.xcalmax) return np.asarray([np.linspace(xmin, xmax, cfg.ncal)], dtype=float) # For multi-dimensional models, a 1D line is ambiguous. # Use the original x rows for uncertainty output. return x.copy()
# ---------------------------------------------------------------------- # Saving and plotting # ----------------------------------------------------------------------
[ドキュメント] def save_outputs(cfg: types.SimpleNamespace, data: Dict[str, Any], p_opt: np.ndarray, p_std: np.ndarray, cov_full: np.ndarray, corr_full: np.ndarray, y_initial: np.ndarray, y_fit: np.ndarray, history: Dict[str, list], cov_info: Dict[str, Any], bands: Optional[Dict[str, np.ndarray]], xcal: np.ndarray) -> None: """ フィッティング結果、パラメータ、共分散、収束履歴などをExcelファイルとして保存します。 フィッティングデータ (`df_fit`)、計算グリッド上のモデルと不確実性バンド (`df_cal`)、 最適化履歴 (`df_conv`)、パラメータ情報 (`df_param`)、共分散行列 (`df_cov`)、 相関行列 (`df_corr`)、およびサマリー情報 (`df_summary`) を生成します。 これらのDataFrameをそれぞれ異なるシートとして、設定された出力Excelファイル (`cfg.output_fitting_path` および `cfg.output_parameter_path`) に書き込みます。 収束履歴は別途CSVファイル (`cfg.output_convergence_path`) としても保存されます。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :param data: Dict[str, Any] ロードされたデータ。 :param p_opt: numpy.ndarray 最適化されたパラメータのベクトル。 :param p_std: numpy.ndarray パラメータの標準誤差のベクトル。 :param cov_full: numpy.ndarray 完全な共分散行列。 :param corr_full: numpy.ndarray 完全な相関行列。 :param y_initial: numpy.ndarray 初期パラメータでのモデル予測値。 :param y_fit: numpy.ndarray 最適化されたパラメータでのモデル予測値(データ点上)。 :param history: Dict[str, list] 最適化の収束履歴。 :param cov_info: Dict[str, Any] 共分散計算結果の詳細情報。 :param bands: Optional[Dict[str, numpy.ndarray]] 不確実性バンド情報、または `None`。 :param xcal: numpy.ndarray 計算グリッドの独立変数。 :returns: None """ x = data['x'] y = data['y'] xlabels = data['xlabels'] ylabel = data['ylabel'] print() print(f"Save fitting results to [{cfg.output_fitting_path}]") fit_dict = {} for j, label in enumerate(xlabels): fit_dict[label] = x[j] fit_dict[ylabel] = y fit_dict[f'{ylabel}(initial)'] = y_initial fit_dict[f'{ylabel}(fit)'] = y_fit fit_dict[f'{ylabel}(residual)'] = y - y_fit df_fit = pd.DataFrame(fit_dict) cal_dict = {} for j, label in enumerate(xlabels): cal_dict[f'{label}(cal)'] = xcal[j] if bands is not None: cal_dict[f'{ylabel}(mean)'] = bands['y_mean'] cal_dict[f'{ylabel}(sigma_param)'] = bands['sigma_param'] cal_dict[f'{ylabel}(sigma_pred)'] = bands['sigma_pred'] cal_dict[f'{ylabel}(ci_param)'] = bands['ci_param'] cal_dict[f'{ylabel}(ci_pred)'] = bands['ci_pred'] cal_dict['t_multiplier'] = bands['tval'] df_cal = pd.DataFrame(cal_dict) df_conv = pd.DataFrame(history) param_rows = [] for i in range(cfg.nparam): param_rows.append({ 'index': i, 'initial': cfg.p0[i], 'final': p_opt[i], 'std': p_std[i], 'fixed': int(cfg.fixed_mask[i]), }) df_param = pd.DataFrame(param_rows) df_cov = pd.DataFrame(cov_full, columns=[f'p{i}' for i in range(cfg.nparam)], index=[f'p{i}' for i in range(cfg.nparam)]) df_corr = pd.DataFrame(corr_full, columns=[f'p{i}' for i in range(cfg.nparam)], index=[f'p{i}' for i in range(cfg.nparam)]) df_summary = pd.DataFrame([ {'key': 'func', 'value': cfg.func}, {'key': 'method', 'value': cfg.method}, {'key': 'ndata', 'value': cov_info['ndata']}, {'key': 'n_active_parameters', 'value': cov_info['npar_active']}, {'key': 'dof', 'value': cov_info['dof']}, {'key': 'RSS', 'value': cov_info['rss']}, {'key': 'MSE', 'value': cov_info['rss'] / max(cov_info['ndata'], 1)}, {'key': 'sigma2_resid', 'value': cov_info['sigma2_resid']}, {'key': 'sigma_resid', 'value': cov_info['sigma_resid']}, {'key': 'confidence', 'value': cfg.confidence}, {'key': 'covariance_status', 'value': cov_info['message']}, ]) with pd.ExcelWriter(cfg.output_fitting_path, engine='openpyxl') as writer: df_fit.to_excel(writer, sheet_name='fit_data', index=False) df_cal.to_excel(writer, sheet_name='calculation_grid', index=False) df_conv.to_excel(writer, sheet_name='convergence', index=False) df_summary.to_excel(writer, sheet_name='summary', index=False) print(f"Save parameter results to [{cfg.output_parameter_path}]") with pd.ExcelWriter(cfg.output_parameter_path, engine='openpyxl') as writer: df_param.to_excel(writer, sheet_name='parameters', index=False) df_cov.to_excel(writer, sheet_name='covariance') df_corr.to_excel(writer, sheet_name='correlation') df_summary.to_excel(writer, sheet_name='summary', index=False) print(f"Save convergence history to [{cfg.output_convergence_path}]") df_conv.to_excel(cfg.output_convergence_path, index=False)
[ドキュメント] class LiveFitPlotter: """ 非線形フィッティングの進行状況をリアルタイムで監視するためのライブプロッタークラス。 コンストラクタで初期プロットを設定し、フィッティングデータと初期モデルをプロットします。 `update` メソッドは、最適化の反復中にモデルの現在のフィットとMSEの収束履歴を更新します。 `finalize` メソッドは、最適化完了後の最終状態をプロットに反映させます。 プロットは `matplotlib.pyplot` を使用し、インタラクティブモードで動作します。 """ def __init__(self, cfg: types.SimpleNamespace, data: Dict[str, Any], y_initial: np.ndarray): """ LiveFitPlotter クラスのコンストラクタ。 設定、データ、初期モデル予測値を受け取ります。 `cfg.plot` と `cfg.nplot_interval` に基づいてライブプロットを有効にするかを決定します。 2つのサブプロット(フィット結果とMSE収束)を持つMatplotlibのFigureを初期化します。 データ、初期フィット、および現在のフィットを表すラインをプロットします。 収束プロットの初期設定を行います。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :param data: Dict[str, Any] ロードされたデータ。 :param y_initial: numpy.ndarray 初期パラメータでのモデル予測値。 :returns: None """ self.cfg = cfg self.data = data self.y_initial = y_initial self.enabled = bool(cfg.plot) and int(cfg.nplot_interval) > 0 self.fig = None self.ax_fit = None self.ax_conv = None self.line_current = None self.line_conv = None self.nvar = data['x'].shape[0] self.xplot = None if not self.enabled: return # Interactive mode keeps the window responsive during scipy.optimize.minimize(). plt.ion() self.fig, (self.ax_fit, self.ax_conv) = plt.subplots(1, 2, figsize=cfg.figsize) x = data['x'] y = data['y'] ylabel = data['ylabel'] xlabels = data['xlabels'] if self.nvar == 1: self.xplot = x[0] self.ax_fit.plot(x[0], y, 'o', label='data', markersize=3) self.ax_fit.plot(x[0], y_initial, '--', label='initial', linewidth=0.8) (self.line_current,) = self.ax_fit.plot(x[0], y_initial, '-', label='current', linewidth=1.0) self.ax_fit.set_xlabel(xlabels[0], fontsize=cfg.fontsize) else: self.xplot = np.arange(len(y)) self.ax_fit.plot(self.xplot, y, 'o', label='data', markersize=3) self.ax_fit.plot(self.xplot, y_initial, '--', label='initial', linewidth=0.8) (self.line_current,) = self.ax_fit.plot(self.xplot, y_initial, '-', label='current', linewidth=1.0) self.ax_fit.set_xlabel('index', fontsize=cfg.fontsize) self.ax_fit.set_ylabel(ylabel, fontsize=cfg.fontsize) self.ax_fit.tick_params(labelsize=cfg.fontsize) self.ax_fit.legend(fontsize=cfg.fontsize_legend) (self.line_conv,) = self.ax_conv.plot([], [], 'o-', linewidth=0.8, markersize=3) self.ax_conv.set_xlabel('# of iteration', fontsize=cfg.fontsize) self.ax_conv.set_ylabel('MSE', fontsize=cfg.fontsize) self.ax_conv.set_yscale('log') self.ax_conv.tick_params(labelsize=cfg.fontsize) self.fig.suptitle('Fitting progress') self.fig.tight_layout() self.fig.canvas.draw_idle() plt.pause(cfg.plot_pause)
[ドキュメント] def update(self, iteration: int, p_current: np.ndarray, mse_history: Dict[str, list], env: Dict[str, Any]) -> None: """ ライブフィッティングプロットを更新します。 `cfg.nplot_interval` で指定された間隔でのみ更新を行います。 現在のパラメータ `p_current` に基づいてモデル予測値を再計算し、フィットプロットを更新します。 `mse_history` に基づいてMSE収束プロットを更新し、軸の範囲を調整します。 `plt.pause` を使用して、描画を一時停止し、GUIイベントを処理します。 :param iteration: int 現在の最適化の反復回数。 :param p_current: numpy.ndarray 現在のパラメータのベクトル。 :param mse_history: Dict[str, list] MSEの収束履歴。 :param env: Dict[str, Any] 評価環境。 :returns: None """ if not self.enabled: return if iteration % int(self.cfg.nplot_interval) != 0: return if self.fig is None or not plt.fignum_exists(self.fig.number): self.enabled = False return x = self.data['x'] y_current = cal_y_array(x, p_current, self.cfg, env) self.line_current.set_ydata(y_current) self.ax_fit.relim() self.ax_fit.autoscale_view() iters = np.asarray(mse_history['iter'], dtype=float) mses = np.asarray(mse_history['MSE'], dtype=float) mask = np.isfinite(mses) & (mses > 0) self.line_conv.set_data(iters, mses) if len(iters) > 0: self.ax_conv.set_xlim(0, max(1.0, float(np.max(iters)) + 1.0)) if np.any(mask): ymin = float(np.min(mses[mask])) ymax = float(np.max(mses[mask])) if ymin == ymax: ymin *= 0.8 ymax *= 1.2 self.ax_conv.set_ylim(ymin * 0.8, ymax * 1.2) self.fig.canvas.draw_idle() plt.pause(self.cfg.plot_pause)
[ドキュメント] def finalize(self, p_final: np.ndarray, mse_history: Dict[str, list], env: Dict[str, Any]) -> None: """ ライブフィッティングプロットを最終状態に更新します。 最適化が完了した後、最終的なパラメータと履歴でプロットを一度だけ更新します。 これにより、ライブ更新間隔に関わらず常に最終結果が表示されます。 :param p_final: numpy.ndarray 最適化された最終パラメータのベクトル。 :param mse_history: Dict[str, list] MSEの収束履歴。 :param env: Dict[str, Any] 評価環境。 :returns: None """ if not self.enabled: return # Always show the final state, even if it is not exactly on the interval. final_iter = int(mse_history['iter'][-1]) if len(mse_history['iter']) else 0 old_interval = self.cfg.nplot_interval self.cfg.nplot_interval = 1 self.update(final_iter, p_final, mse_history, env) self.cfg.nplot_interval = old_interval
[ドキュメント] def plot_results(cfg: types.SimpleNamespace, data: Dict[str, Any], p_opt: np.ndarray, y_initial: np.ndarray, y_fit: np.ndarray, p_std: np.ndarray, bands: Optional[Dict[str, np.ndarray]], xcal: np.ndarray) -> None: """ フィッティング結果と不確実性バンドをプロットします。 `cfg.plot` が `False` の場合、何もしません。 `cfg.plot_ci` に応じて、モデルフィットとパラメータ誤差の2つのサブプロットを作成します。 独立変数が1次元の場合と多次元の場合でプロット方法を分けます。 データ、初期モデル、フィットしたモデル、および設定に応じてパラメータ信頼区間、 予測信頼区間をプロットします。 パラメータ誤差プロットでは、各パラメータの最終値と標準誤差をエラーバーで表示します。 プロットのタイトル、ラベル、凡例、フォントサイズを設定します。 :param cfg: types.SimpleNamespace 設定パラメータを含むオブジェクト。 :param data: Dict[str, Any] ロードされたデータ。 :param p_opt: numpy.ndarray 最適化されたパラメータのベクトル。 :param y_initial: numpy.ndarray 初期パラメータでのモデル予測値。 :param y_fit: numpy.ndarray 最適化されたパラメータでのモデル予測値(データ点上)。 :param p_std: numpy.ndarray パラメータの標準誤差のベクトル。 :param bands: Optional[Dict[str, numpy.ndarray]] 不確実性バンド情報、または `None`。 :param xcal: numpy.ndarray 計算グリッドの独立変数。 :returns: None """ if not cfg.plot: return x = data['x'] y = data['y'] xlabels = data['xlabels'] ylabel = data['ylabel'] nvar = x.shape[0] if cfg.plot_ci: fig, (ax1, ax2) = plt.subplots(1, 2, figsize=cfg.figsize) else: fig, ax1 = plt.subplots(1, 1, figsize=cfg.figsize) ax2 = None if nvar == 1: ax1.plot(x[0], y, 'o', label='data', markersize=3) ax1.plot(x[0], y_initial, '--', label='initial', linewidth=0.8) if bands is not None: ax1.plot(xcal[0], bands['y_mean'], '-', label='fit', linewidth=1.0) if cfg.plot_sigma_pred: ax1.fill_between(xcal[0], bands['y_mean'] - bands['ci_pred'], bands['y_mean'] + bands['ci_pred'], alpha=0.25, label=f'prediction CI ({cfg.confidence:g})') if cfg.plot_sigma_param: ax1.fill_between(xcal[0], bands['y_mean'] - bands['ci_param'], bands['y_mean'] + bands['ci_param'], alpha=0.35, label=f'parameter CI ({cfg.confidence:g})') else: ax1.plot(x[0], y_fit, '-', label='fit', linewidth=1.0) ax1.set_xlabel(xlabels[0], fontsize=cfg.fontsize) else: idx = np.arange(len(y)) ax1.plot(idx, y, 'o', label='data', markersize=3) ax1.plot(idx, y_initial, '--', label='initial', linewidth=0.8) ax1.plot(idx, y_fit, '-', label='fit', linewidth=1.0) if bands is not None and cfg.plot_sigma_param: ax1.fill_between(idx, bands['y_mean'] - bands['ci_param'], bands['y_mean'] + bands['ci_param'], alpha=0.35, label=f'parameter CI ({cfg.confidence:g})') if bands is not None and cfg.plot_sigma_pred: ax1.fill_between(idx, bands['y_mean'] - bands['ci_pred'], bands['y_mean'] + bands['ci_pred'], alpha=0.25, label=f'prediction CI ({cfg.confidence:g})') ax1.set_xlabel('index', fontsize=cfg.fontsize) ax1.set_ylabel(ylabel, fontsize=cfg.fontsize) ax1.tick_params(labelsize=cfg.fontsize) ax1.legend(fontsize=cfg.fontsize_legend) if ax2 is not None: idx = np.arange(cfg.nparam) ax2.errorbar(idx, p_opt, yerr=p_std, fmt='o', capsize=3, label='parameter ±1σ') ax2.set_xlabel('parameter index', fontsize=cfg.fontsize) ax2.set_ylabel('parameter value', fontsize=cfg.fontsize) ax2.tick_params(labelsize=cfg.fontsize) ax2.legend(fontsize=cfg.fontsize_legend) plt.tight_layout() plt.show()
# ---------------------------------------------------------------------- # Main # ----------------------------------------------------------------------
[ドキュメント] def main() -> None: """ プログラムのメインエントリポイント。 `initialize()` を呼び出して設定をロードします。 `load_data()` を呼び出して入力データを読み込みます。 `build_eval_env()` で評価環境を構築します。 `cfg.mode` が 'sim' (シミュレーション) の場合、フィッティングせずに結果を保存・プロットして終了します。 `cfg.mode` が 'fit' (フィッティング) の場合、`scipy.optimize.minimize` を使用して 非線形最小二乗フィッティングを実行します。 フィッティング中、`LiveFitPlotter` を使用してライブプロットを更新します。 フィッティング完了後、ヤコビアンからパラメータの不確実性(標準誤差、共分散、相関)を推定します。 `compute_uncertainty_bands()` を使用して信頼区間を計算します。 `save_outputs()` と `plot_results()` を呼び出して結果を保存・表示します。 処理中にエラーが発生した場合、詳細なトレースバックを表示し、終了します。 :returns: None """ cfg = initialize() print('Generic nonlinear least-squares fitting') print(f' infile : {cfg.infile}') print(f' func : {cfg.func}') print(f' xlabels : {cfg.xlabels_list}') print(f' ylabel : {cfg.ylabel}') print(f' method : {cfg.method}') print(f' p0 : {cfg.p0}') print(f' fixed mask : {cfg.fixed_mask.astype(int)}') try: data = load_data(cfg) env = build_eval_env() x = data['x'] y = data['y'] print(f' ndata : {len(y)}') print(f' fit ranges : {data["ranges"]}') y_initial = cal_y_array(x, cfg.p0, cfg, env) if cfg.mode == 'sim': print('Simulation mode: no fitting is performed.') history = {'iter': [0], 'MSE': [float(np.mean((y - y_initial)**2))]} p_std = np.full(cfg.nparam, np.nan) cov_full = np.full((cfg.nparam, cfg.nparam), np.nan) corr_full = np.full((cfg.nparam, cfg.nparam), np.nan) cov_info = {'ndata': len(y), 'npar_active': int(np.sum(cfg.active_mask)), 'dof': np.nan, 'rss': float(np.sum((y - y_initial)**2)), 'sigma2_resid': np.nan, 'sigma_resid': np.nan, 'message': 'simulation mode'} save_outputs(cfg, data, cfg.p0, p_std, cov_full, corr_full, y_initial, y_initial, history, cov_info, None, x) plot_results(cfg, data, cfg.p0, y_initial, y_initial, p_std, None, x) pause_if_needed(cfg.pause) return q0 = pack_active(cfg.p0, cfg.active_mask) b_active = active_bounds(cfg.bounds_full, cfg.active_mask) history = {'iter': [], 'MSE': []} hist_state = {'iter': 0, 'last_p': cfg.p0.copy(), 'last_mse': np.nan} live_plotter = LiveFitPlotter(cfg, data, y_initial) grad_methods = {'CG', 'BFGS', 'L-BFGS-B', 'TNC', 'SLSQP', 'trust-constr'} no_grad_methods = {'Nelder-Mead', 'Powell', 'COBYLA'} method_upper_name = cfg.method.strip() use_jac = bool(cfg.use_jac) and (method_upper_name not in no_grad_methods) def callback(q): mse = objective_q(q, cfg.p0, x, y, cfg, env, hist_state) hist_state['iter'] += 1 history['iter'].append(hist_state['iter']) history['MSE'].append(mse) print(f"callback {hist_state['iter']:5d}: MSE={mse:.8e} p={hist_state['last_p']}") live_plotter.update(hist_state['iter'], hist_state['last_p'], history, env) print() print('Minimize:') kwargs = dict( fun=lambda q: objective_q(q, cfg.p0, x, y, cfg, env, hist_state), x0=q0, method=cfg.method, callback=callback, tol=cfg.tol, options={'maxiter': cfg.maxiter, 'disp': True}, ) if b_active is not None: kwargs['bounds'] = b_active if use_jac: kwargs['jac'] = lambda q: gradient_q(q, cfg.p0, x, y, cfg, env) res = minimize(**kwargs) p_opt = unpack_active(res.x, cfg.p0, cfg.active_mask) y_fit = cal_y_array(x, p_opt, cfg, env) residuals = y - y_fit mse_final = float(np.mean(residuals**2)) print() print('Optimization result:') print(f' success : {res.success}') print(f' message : {res.message}') print(f' nit : {getattr(res, "nit", "-")}') print(f' MSE : {mse_final:.12g}') print(f' p : {p_opt}') J = numerical_model_jacobian(p_opt, x, cfg, env) cov_info = covariance_from_jacobian(J, residuals) print() print('Uncertainty estimate:') print(f" status : {cov_info['message']}") print(f" ndata : {cov_info['ndata']}") print(f" n_active_param: {cov_info['npar_active']}") print(f" dof : {cov_info['dof']}") print(f" RSS : {cov_info['rss']:.12g}") print(f" sigma_resid : {cov_info['sigma_resid']}") p_std = expand_active_vector(cov_info['std_active'], cfg) cov_full = expand_active_matrix(cov_info['cov_active'], cfg) corr_full = expand_active_matrix(cov_info['corr_active'], cfg) print() print('Fitted parameters:') for i in range(cfg.nparam): if cfg.fixed_mask[i]: print(f' p[{i}] = {p_opt[i]:.12g} fixed') else: print(f' p[{i}] = {p_opt[i]:.12g} +- {p_std[i]:.6g}') xcal = make_calculation_grid(x, cfg) bands = None if cov_info['valid']: bands = compute_uncertainty_bands(xcal, p_opt, cfg, env, cov_info['cov_active'], cov_info['sigma2_resid'], cfg.confidence) save_outputs(cfg, data, p_opt, p_std, cov_full, corr_full, y_initial, y_fit, history, cov_info, bands, xcal) live_plotter.finalize(p_opt, history, env) plot_results(cfg, data, p_opt, y_initial, y_fit, p_std, bands, xcal) pause_if_needed(cfg.pause) except SystemExit: raise except Exception: print('\nERROR:') traceback.print_exc() pause_if_needed(cfg.pause) sys.exit(1)
if __name__ == '__main__': main()