regression.mlsq_error_argparse のソースコード

"""
多項式最小二乗フィッティングおよび不確実性(誤差)の可視化を行うスクリプト。

関連リンク: :doc:`mlsq_error_argparse_usage`
"""

import sys
import argparse
import types
import numpy as np
from math import pow
import matplotlib.pyplot as plt

from tklib.tkutils import terminate, pint, pfloat
from tklib.tkapplication import tkApplication
from tklib.tkvariousdata import tkVariousData
from tklib.tkgraphic.tkplotevent import tkPlotEvent
from tklib.tksci.tkFit import tkFit


[ドキュメント] def initialize(): """ コマンドライン引数を解析し、設定情報を持つ名前空間を初期化する。 詳細設定などのコマンドライン引数を受け取り、入出力ファイルのパスも構築して設定オブジェクトに格納する。 :returns: app (tkApplicationインスタンス), cfg (設定を格納したtypes.SimpleNamespace), parser (argparse.ArgumentParser) のタプル。 """ app = tkApplication() parser = argparse.ArgumentParser(description='Polynomial least-squares fitting with uncertainty visualization') parser.add_argument('--infile', type=str, default='random-poly.xlsx', help='Input data file') parser.add_argument('--xlabel', type=str, default='0', help='Label or index for x data') parser.add_argument('--ylabel', type=str, default='1', help='Label or index for y data') parser.add_argument('--norder', type=int, default=3, help='Polynomial order') parser.add_argument('--xmin', type=float, default=-1.0e100, help='Minimum x for fitting') parser.add_argument('--xmax', type=float, default=1.0e100, help='Maximum x for fitting') parser.add_argument('--xcalmin', type=str, default=None, help='Minimum x for calculation grid') parser.add_argument('--xcalmax', type=str, default=None, help='Maximum x for calculation grid') parser.add_argument('--ncal', type=int, default=201, help='Number of points in xcal grid') parser.add_argument('--xlsm_template', type=str, default="StandardGraph.xlsm", help='Excel template path') parser.add_argument('--figsize', type=float, nargs=2, default=[8, 8], help='プロット figsize, 例: --figsize 8 8') parser.add_argument('--plot_ci', type=int, default=1, help='Plot confidence interval') parser.add_argument('--plot_sigma_param', type=int, default=1, help='Plot parameter uncertainty band') parser.add_argument('--plot_sigma_pred', type=int, default=1, help='Plot prediction uncertainty band') parser.add_argument('--plot_sigma_combined', type=int, default=0, help='Plot combined uncertainty band') parser.add_argument('--fontsize', type=int, default=16, help='Font size for plots') parser.add_argument('--fontsize_legend', type=int, default=12, help='Legend font size') group = parser.add_mutually_exclusive_group() group.add_argument('--pause', dest='pause', action='store_true', help='Pause on terminate') group.add_argument('--no-pause', dest='pause', action='store_false', help='Do not pause on terminate') parser.set_defaults(pause=True) args = parser.parse_args() cfg = types.SimpleNamespace() for key in vars(args): setattr(cfg, key, getattr(args, key)) cfg.output_fitting_path = app.replace_path(cfg.infile, template = ["{dirname}", "{filebody}-fit.xlsm"]) cfg.output_parameter_path = app.replace_path(cfg.infile, template = ["{dirname}", "{filebody}-parameters.xlsx"]) return app, cfg, parser
[ドキュメント] def build_design_matrix(x, order): """ 多項式回帰のための計画行列(デザインマトリクス)を構築する。 与えられたデータ点xと多項式の次数orderに基づいて、各列がxの累乗となる行列を作成する。 :param x: 入力データ点の配列(array-like)。 :param order: 多項式の次数(int)。 :returns: (N, order+1) の形状を持つ計画行列(np.ndarray)。 """ x = np.asarray(x) N = len(x) X = np.ones((N, order + 1)) for j in range(1, order + 1): # Using scalar pow for clarity X[:, j] = [pow(val, j) for val in x] return X
[ドキュメント] def mlsq_error(X, y): """ 最小二乗法によるフィッティングを実行する。 計画行列Xと観測値yから、回帰係数、係数の標準誤差、共分散行列、および残差分散を計算する。 :param X: 計画行列((N, p) のnp.ndarray)。 :param y: 観測値の配列((N,) のarray-like)。 :returns: 推定された係数 beta (np.ndarray)、係数の標準誤差 beta_std (np.ndarray)、パラメータの共分散行列 cov_beta (np.ndarray)、残差分散 sigma2_resid (float) のタプル。 """ y = np.asarray(y) XtX = X.T @ X XtX_inv = np.linalg.inv(XtX) beta = XtX_inv @ (X.T @ y) # Residuals and variance residuals = y - X @ beta N, p = X.shape RSS = float(residuals.T @ residuals) sigma2_resid = RSS / (N - p) cov_beta = sigma2_resid * XtX_inv beta_std = np.sqrt(np.diag(cov_beta)) return beta, beta_std, cov_beta, sigma2_resid
[ドキュメント] def compute_param_uncertainty(X, cov_beta): """ 各データ点におけるパラメータ由来の予測分散を計算する。 計画行列とパラメータの共分散行列から、予測値の分散を求める。 :param X: 評価点における計画行列((N, p) のnp.ndarray)。 :param cov_beta: パラメータの共分散行列((p, p) のnp.ndarray)。 :returns: 各点での予測分散 y_var ((N,) のnp.ndarray)。 """ # Var[y_mean] = diag(X @ cov_beta @ X.T) return np.sum((X @ cov_beta) * X, axis=1)
[ドキュメント] def compute_measurement_error(y): """ 入力データの分散から測定誤差を推定する。 観測値yの不偏分散を用いて、測定値の標準偏差を計算する。 :param y: 観測値の配列(array-like)。 :returns: 推定された測定誤差の標準偏差(float)。 """ y = np.asarray(y) var_unbiased = np.var(y, ddof=1) if len(y) > 1 else 0.0 return np.sqrt(var_unbiased)
[ドキュメント] def compute_bands(xcal, beta, cov_beta, sigma2_resid, sigma_meas): """ 計算グリッド上での平均予測値と各種不確実性(誤差)バンドを計算する。 パラメータの不確実性、残差、および測定誤差を考慮した標準偏差をそれぞれ算出する。 :param xcal: 評価を行うxのデータ点(array-like)。 :param beta: 最小二乗法で推定された係数(np.ndarray)。 :param cov_beta: パラメータの共分散行列(np.ndarray)。 :param sigma2_resid: 残差分散(float)。 :param sigma_meas: 測定誤差の標準偏差(float)。 :returns: 平均予測値('y_mean')、パラメータ由来の標準偏差('sigma_param')、予測の標準偏差('sigma_pred')、複合された標準偏差('sigma_combined')を含む辞書(dict)。 """ Xcal = build_design_matrix(xcal, len(beta) - 1) y_mean = Xcal @ beta var_param = compute_param_uncertainty(Xcal, cov_beta) sigma_param = np.sqrt(var_param) sigma_pred = np.sqrt(var_param + sigma2_resid) sigma_combined = np.sqrt(var_param + sigma_meas**2) return { 'y_mean': y_mean, 'sigma_param': sigma_param, 'sigma_pred': sigma_pred, 'sigma_combined': sigma_combined }
[ドキュメント] def load_data(infile, xlabel, ylabel, xmin, xmax, pause_flag): """ tkVariousDataを使用してファイルからデータを読み込み、xminとxmaxでフィルタリングする。 指定されたラベルまたはインデックスに基づいてxとyのデータを抽出し、指定範囲内のデータを返す。 :param infile: 入力ファイルのパス(str)。 :param xlabel: xデータのラベルまたはインデックス(str)。 :param ylabel: yデータのラベルまたはインデックス(str)。 :param xmin: xの最小値(float)。 :param xmax: xの最大値(float)。 :param pause_flag: エラー終了時に一時停止するかどうかのフラグ(bool)。 :returns: 抽出されたデータ 'x' と 'y' のリストを含む辞書(dict)。 """ datafile = tkVariousData(infile) labels, _ = datafile.Read_minimum_matrix(close_fp=True, force_numeric=False) _xlabel, xin = datafile.FindDataArray(xlabel, flag='i') _ylabel, yin = datafile.FindDataArray(ylabel, flag='i') if xin is None: terminate(f"Error: xlabel [{xlabel}] not found", pause=pause_flag) if yin is None: terminate(f"Error: ylabel [{ylabel}] not found", pause=pause_flag) x = [] y = [] for xi, yi in zip(xin, yin): if xmin <= xi <= xmax: x.append(xi) y.append(yi) if len(x) == 0: terminate("Error: No data in specified x-range", pause=pause_flag) return {'x': x, 'y': y}
# ===================================================================== # Section 6: Main Execution and Plotting # =====================================================================
[ドキュメント] def main(): """ メイン実行関数。 コマンドライン引数のパース、データの読み込み、多項式による最小二乗フィッティングの実行、 不確実性の計算、結果のExcelファイルへの保存、およびプロットによる結果の可視化を行う。 :returns: なし """ app, cfg, parser = initialize() # Open logfile if desired logfile = app.replace_path(cfg.infile) print() print(f"Open logfile [{logfile}]") app.redict(targets=["stdout", logfile], mode='w') # Parse xlabel/ylabel: allow numeric index or string label xlabel = cfg.xlabel ylabel = cfg.ylabel # xmin/xmax are floats already print() print(f"Configuration: infile={cfg.infile}, xlabel={xlabel}, ylabel={ylabel}, norder={cfg.norder}") print(f"Fitting range: {cfg.xmin} to {cfg.xmax}") # Load data data = load_data(cfg.infile, xlabel, ylabel, cfg.xmin, cfg.xmax, cfg.pause) x = data['x'] y = data['y'] ndata = len(x) print(f"ndata = {ndata}") for xi, yi in zip(x, y): print(f"{xi:12.4g} {yi:12.4g}") # Fit parameters X = build_design_matrix(x, cfg.norder) beta, beta_std, cov_beta, sigma2_resid = mlsq_error(X, y) ycal = X @ beta print("Fitted polynomial coefficients:") for i, coef in enumerate(beta): print(f" c{i} = {coef:g} +- {beta_std[i]:g}") print(f" Residual variance sigma2_resid = {sigma2_resid:g}") print(f"Save parameters and stds to {cfg.output_parameter_path}") fit = tkFit() fit.to_excel(cfg.output_parameter_path, ["coeff", "std"], [beta, beta_std]) if cfg.xcalmin == '*' or cfg.xcalmin is None: xcalmin = min(x) else: xcalmin = cfg.xcalmin if cfg.xcalmax == '*' or cfg.xcalmax is None: xcalmax = max(x) else: xcalmax = cfg.xcalmax xcal = np.linspace(xcalmin, xcalmax, cfg.ncal) print() print(f"Calculation grid: {xcalmin} to {xcalmax}, points = {cfg.ncal}") sigma_meas = compute_measurement_error(y) print(f"Estimated measurement error sigma_meas = {sigma_meas:g}") bands = compute_bands(xcal, beta, cov_beta, sigma2_resid, sigma_meas) print("") print(f"Save results to [{cfg.output_fitting_path}]") fit.to_excel(cfg.output_fitting_path, [xlabel, ylabel, f"{ylabel}(fit)", "", f"{xlabel}(cal)", f"{ylabel}(mean)", f"{ylabel}(std(param))", f"{ylabel}(std(param&resid)", f"{ylabel}(std(param&noise)"], [x, y, ycal, [], xcal, bands['y_mean'], bands['sigma_param'], bands['sigma_pred'], bands['sigma_combined']], template = cfg.xlsm_template) # Plot if cfg.plot_ci: fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10,5)) else: fig, ax1 = plt.subplots(1, 1, figsize=(5,5)) ax2 = None plot_event = tkPlotEvent(plt) # Left: data, fit, bands ax1.plot(x, y, 'o', label='data', color = 'black', markersize=1.5) ax1.plot(xcal, bands['y_mean'], '-', label='fit', linewidth = 0.5, color = 'red') if cfg.plot_sigma_param: ax1.fill_between(xcal, bands['y_mean'] - bands['sigma_param'], bands['y_mean'] + bands['sigma_param'], color='#0000cc', alpha=0.5, label='±σ(param)') if cfg.plot_sigma_pred: ax1.fill_between(xcal, bands['y_mean'] - bands['sigma_pred'], bands['y_mean'] + bands['sigma_pred'], color='#ddddFF', alpha=0.5, label='±σ(param&resid)') if cfg.plot_sigma_combined: ax1.fill_between(xcal, bands['y_mean'] - bands['sigma_combined'], bands['y_mean'] + bands['sigma_combined'], color='purple', alpha=0.5, label='±σ(param+noize)') ax1.set_xlabel(cfg.xlabel, fontsize=cfg.fontsize) ax1.set_ylabel(cfg.ylabel, fontsize=cfg.fontsize) ax1.tick_params(labelsize=cfg.fontsize) ax1.legend(fontsize=cfg.fontsize_legend) if ax2: idxs = np.arange(len(beta)) ax2.errorbar(idxs, beta, yerr=beta_std, fmt='o', capsize=3, label='coeff ±1σ') ax2.set_xlabel('$i$', fontsize=cfg.fontsize) ax2.set_ylabel('$c_i$', fontsize=cfg.fontsize) ax2.tick_params(labelsize=cfg.fontsize) ax2.legend(fontsize=cfg.fontsize_legend) plot_event.register_event(fig, event='button_press_event', callback=lambda e: plot_event.onclick(e)) plt.tight_layout() plt.pause(0.1) # Terminate application app.terminate("", pause=cfg.pause)
if __name__ == '__main__': main()