#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
adaptive_gaussian_ridge_practical.py
概要:
チェビシェフバックグラウンド、重み付きRidge正則化、およびオプションのプルーニングを伴う、
適応型可変幅ガウス基底回帰を実行します。
詳細説明:
このスクリプトは、Excelファイルまたは内蔵デモデータからx-yデータを読み込み、
バックグラウンド項とガウス基底項の和としてデータを回帰します。
バックグラウンド係数とガウス基底係数には異なるRidgeペナルティを設定できます。
幅の広いガウス基底に強いペナルティを与えることもできます。
オプションのプルーニング機能により、寄与の小さいガウス基底を段階的に除去できます。
計算結果はExcelファイルに出力でき、入力データ、フィット結果、バックグラウンド、
ガウス成分、基底パラメータ、プルーニング履歴、設定概要などを保存します。
診断プロット、主たるフィットプロット、個々のガウス成分の分解プロットも表示または保存できます。
使用例:
python adaptive_gaussian_ridge.py --infile demo --mode prune --n-centers 60 --bg-order 3
python adaptive_gaussian_ridge.py --infile data.xlsx --xcol Energy --ycol Intensity --mode prune
python adaptive_gaussian_ridge.py --infile data.xlsx --sheet Sheet1 --xcol 0 --ycol 1 --outfile result.xlsx
関連リンク:
adaptive_gaussian_ridge_usage
"""
import argparse
import os
import sys
import traceback
from dataclasses import dataclass
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import savgol_filter
from numpy.polynomial.chebyshev import chebvander
sigma_slope_eps = 1.0e-6
# =============================================================================
# Demo data
# =============================================================================
[ドキュメント]
def true_background(x):
"""
概要:
模擬データの真のバックグラウンド関数を計算します。
引数:
:param x: x座標の配列。
:type x: numpy.ndarray
戻り値:
:returns: xに対応するバックグラウンド値。
:rtype: numpy.ndarray
"""
return 0.10 + 0.08 * x - 0.025 * x**2
[ドキュメント]
def true_peaks(x):
"""
概要:
模擬データの真のピーク(ガウス関数の和)を計算します。
引数:
:param x: x座標の配列。
:type x: numpy.ndarray
戻り値:
:returns: xに対応するピーク値。
:rtype: numpy.ndarray
"""
return (
1.2 * np.exp(-0.5 * ((x - 1.0) / 0.25) ** 2)
- 0.8 * np.exp(-0.5 * ((x + 1.5) / 0.15) ** 2)
+ 0.25 * np.exp(-0.5 * ((x - 2.0) / 0.45) ** 2)
)
[ドキュメント]
def true_function(x):
"""
概要:
模擬データの真の関数(バックグラウンドとピークの合計)を計算します。
引数:
:param x: x座標の配列。
:type x: numpy.ndarray
戻り値:
:returns: xに対応する真の関数値。
:rtype: numpy.ndarray
"""
return true_background(x) + true_peaks(x)
[ドキュメント]
def generate_demo_data(n=180, noise=0.06, seed=1):
"""
概要:
デモ用のx, y, 真の関数データを生成します。
詳細説明:
指定された点数 n とノイズレベル noise に基づいて、
true_function から模擬データを生成します。
引数:
:param n: データ点の数。
:type n: int
:param noise: yデータに加えるノイズの標準偏差。
:type noise: float
:param seed: 乱数生成のシード。
:type seed: int
戻り値:
:returns: x, y, 真のy値のタプル。
:rtype: tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]
"""
rng = np.random.default_rng(seed)
x = np.linspace(-3.0, 3.0, n)
y_true = true_function(x)
y = y_true + rng.normal(0.0, noise, size=n)
return x, y, y_true
# =============================================================================
# I/O helpers
# =============================================================================
[ドキュメント]
def safe_import_message(exc, package_hint):
"""
概要:
必要なパッケージのインポートまたは使用が失敗した場合に、エラーメッセージを表示し、プログラムを終了します。
詳細説明:
発生した例外の詳細と、必要なパッケージのインストール方法を示すメッセージをユーザーに提供します。
ユーザーがEnterキーを押すとプログラムが終了します。
引数:
:param exc: 発生した例外オブジェクト。
:type exc: Exception
:param package_hint: インストールを促すパッケージ名(例: "pandas openpyxl")。
:type package_hint: str
戻り値:
:returns: なし (この関数はプログラムを終了します)。
:rtype: None
"""
print("ERROR: required package import/use failed.")
traceback.print_exc()
print(f"\nPlease install required package, e.g.\n pip install {package_hint}")
input("\nPress ENTER to terminate>>\n")
sys.exit(1)
[ドキュメント]
def parse_column_selector(selector, columns):
"""
概要:
列セレクタ(名前またはゼロベースのインデックス)から実際の列名を返します。
詳細説明:
セレクタがNoneの場合、Noneを返します。
セレクタが整数(文字列形式も含む)の場合、利用可能な列リストのインデックスとして解釈します。
セレクタが文字列の場合、利用可能な列名と直接比較します。
セレクタがどの形式でも有効な列名に解決できない場合、ValueErrorを発生させます。
引数:
:param selector: 列名、列インデックス、またはNone。
:type selector: Union[str, int, None]
:param columns: 利用可能な列名のリスト。
:type columns: List[str]
戻り値:
:returns: 解決された列名、またはNone。
:rtype: Union[str, None]
例外:
ValueError: 列セレクタが有効な列名に解決できない場合。
"""
if selector is None:
return None
if isinstance(selector, int):
return columns[selector]
s = str(selector).strip()
if s in columns:
return s
# zero-based integer column index is allowed
try:
idx = int(s)
return columns[idx]
except Exception:
pass
raise ValueError(
f"Column '{selector}' was not found. Available columns: {list(columns)}"
)
[ドキュメント]
def make_default_outfile(infile, suffix="_adaptive_gaussian_ridge.xlsx"):
"""
概要:
入力ファイル名からデフォルトの出力Excelファイル名を生成します。
詳細説明:
入力ファイル名が"demo"の場合は"demo_adaptive_gaussian_ridge.xlsx"を返します。
それ以外の場合は、入力ファイル名から拡張子を除去し、指定されたサフィックスを追加します。
引数:
:param infile: 入力ファイル名(または「demo」)。
:type infile: str
:param suffix: 出力ファイルに追加するサフィックス。デフォルトは "_adaptive_gaussian_ridge.xlsx"。
:type suffix: str
戻り値:
:returns: 生成された出力ファイル名。
:rtype: str
"""
if str(infile).strip().lower() == "demo":
return "demo" + suffix
stem, _ = os.path.splitext(os.path.basename(infile))
return stem + suffix
# =============================================================================
# Basis functions
# =============================================================================
[ドキュメント]
def scale_x_to_chebyshev_domain(x, xmin=None, xmax=None):
"""
概要:
x値をチェビシェフ多項式の定義域 [-1, 1] にスケーリングします。
詳細説明:
入力x値を xmin から xmax の範囲で線形にスケーリングし、
チェビシェフ多項式の標準的な定義域である [-1, 1] にマッピングします。
xmin と xmax が指定されない場合、入力x値の最小値と最大値が使用されます。
引数:
:param x: スケーリングするx値の配列。
:type x: numpy.ndarray
:param xmin: x範囲の最小値。指定しない場合はxの最小値を使用。
:type xmin: Optional[float]
:param xmax: x範囲の最大値。指定しない場合はxの最大値を使用。
:type xmax: Optional[float]
戻り値:
:returns: [-1, 1] にスケーリングされたx値の配列。
:rtype: numpy.ndarray
例外:
ValueError: xmaxがxmin以下の場合。
"""
x = np.asarray(x, dtype=float)
if xmin is None:
xmin = np.min(x)
if xmax is None:
xmax = np.max(x)
if xmax <= xmin:
raise ValueError("xmax must be larger than xmin.")
return 2.0 * (x - xmin) / (xmax - xmin) - 1.0
[ドキュメント]
def background_design_matrix(x, order, xmin, xmax):
"""
概要:
チェビシェフ多項式に基づくバックグラウンドのデザイン行列を生成します。
詳細説明:
入力x値を [-1, 1] にスケーリングした後、指定された次数までの
チェビシェフ多項式を計算し、デザイン行列として返します。
orderが-1の場合、バックグラウンドなしとして空の行列を返します。
引数:
:param x: x座標の配列。
:type x: numpy.ndarray
:param order: チェビシェフ多項式の次数。-1の場合、空の行列を返します。
:type order: int
:param xmin: x範囲の最小値。
:type xmin: float
:param xmax: x範囲の最大値。
:type xmax: float
戻り値:
:returns: バックグラウンドのデザイン行列。各列が次数に対応するチェビシェフ多項式の値。
:rtype: numpy.ndarray
"""
if order < 0:
return np.empty((len(x), 0))
xr = scale_x_to_chebyshev_domain(x, xmin, xmax)
return chebvander(xr, order)
[ドキュメント]
def gaussian_design_matrix(x, centers, sigmas):
"""
概要:
ガウス基底関数に基づくデザイン行列を生成します。
詳細説明:
指定された中心 centers と標準偏差 sigmas を持つ
ガウス関数を各x点に対して計算し、デザイン行列として返します。
各列は個々のガウス基底に対応します。
引数:
:param x: x座標の配列。
:type x: numpy.ndarray
:param centers: 各ガウス基底の中心の配列。
:type centers: numpy.ndarray
:param sigmas: 各ガウス基底の標準偏差の配列。
:type sigmas: numpy.ndarray
戻り値:
:returns: ガウス基底のデザイン行列。
:rtype: numpy.ndarray
"""
x = np.asarray(x, dtype=float)[:, None]
centers = np.asarray(centers, dtype=float)[None, :]
sigmas = np.asarray(sigmas, dtype=float)[None, :]
return np.exp(-0.5 * ((x - centers) / sigmas) ** 2)
[ドキュメント]
def design_matrix(x, centers, sigmas, bg_order, xmin, xmax):
"""
概要:
バックグラウンドとガウス基底を組み合わせた全体のデザイン行列を生成します。
詳細説明:
background_design_matrixとgaussian_design_matrixを呼び出し、
それらの結果を水平方向に結合して単一のデザイン行列を作成します。
最初の列群がバックグラウンド項、その後の列群がガウス基底項に対応します。
引数:
:param x: x座標の配列。
:type x: numpy.ndarray
:param centers: ガウス基底の中心の配列。
:type centers: numpy.ndarray
:param sigmas: ガウス基底の標準偏差の配列。
:type sigmas: numpy.ndarray
:param bg_order: チェビシェフバックグラウンドの次数。
:type bg_order: int
:param xmin: x範囲の最小値。
:type xmin: float
:param xmax: x範囲の最大値。
:type xmax: float
戻り値:
:returns: 全体のデザイン行列。
:rtype: numpy.ndarray
"""
phi_bg = background_design_matrix(x, bg_order, xmin, xmax)
phi_g = gaussian_design_matrix(x, centers, sigmas)
return np.column_stack([phi_bg, phi_g])
[ドキュメント]
def adaptive_centers_from_derivatives(
x,
y,
n_centers=45,
eps=0.15,
w_grad=0.3,
w_curv=1.0,
smooth_window=21,
smooth_order=3,
):
"""
概要:
データの導関数に基づいて適応的にガウス基底の中心を決定します。
詳細説明:
まず、Savitzky-Golayフィルターを使用して入力yデータを平滑化します。
平滑化されたデータから1階導関数 (dy) と2階導関数 (d2y) を計算します。
これらの導関数(絶対値で正規化され、重み付けされたもの)とベースライン定数 eps を組み合わせて
「密度」プロファイルを構築します。この密度プロファイルに基づいてx軸をリサンプリングし、
n_centers個の適応的な中心を決定します。
引数:
:param x: x座標の配列。
:type x: numpy.ndarray
:param y: y座標の配列。
:type y: numpy.ndarray
:param n_centers: 生成する中心の数。
:type n_centers: int
:param eps: 密度プロファイルのベースライン定数。
:type eps: float
:param w_grad: 勾配の重み。密度プロファイルにおける1階導関数の寄与を制御します。
:type w_grad: float
:param w_curv: 曲率の重み。密度プロファイルにおける2階導関数の寄与を制御します。
:type w_curv: float
:param smooth_window: Savitzky-Golayフィルターのウィンドウサイズ。
データ点数に応じて自動調整されます。
:type smooth_window: int
:param smooth_order: Savitzky-Golayフィルターの多項式次数。
ウィンドウサイズに応じて自動調整されます。
:type smooth_order: int
戻り値:
:returns: 生成された中心の配列 (centers), 計算された密度プロファイル (density),
平滑化されたyデータ (y_smooth), 1階導関数 (dy), 2階導関数 (d2y) のタプル。
:rtype: tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]
例外:
ValueError: データ点数が少なすぎる場合(5点未満)。
"""
x = np.asarray(x, dtype=float)
y = np.asarray(y, dtype=float)
if len(x) < 5:
raise ValueError("At least 5 data points are required.")
# Robust Savitzky-Golay window adjustment.
smooth_window = int(smooth_window)
smooth_order = int(smooth_order)
if smooth_window < smooth_order + 2:
smooth_window = smooth_order + 2
if smooth_window % 2 == 0:
smooth_window += 1
if smooth_window >= len(x):
smooth_window = len(x) if len(x) % 2 == 1 else len(x) - 1
if smooth_window <= smooth_order:
smooth_order = max(1, smooth_window - 2)
y_smooth = savgol_filter(y, smooth_window, smooth_order)
dy = np.gradient(y_smooth, x)
d2y = np.gradient(dy, x)
grad_score = np.abs(dy)
curv_score = np.abs(d2y)
grad_score /= np.max(grad_score) + 1.0e-30
curv_score /= np.max(curv_score) + 1.0e-30
density = eps + w_grad * grad_score + w_curv * curv_score
dx = np.diff(x)
u = np.zeros_like(x)
u[1:] = np.cumsum(0.5 * (density[1:] + density[:-1]) * dx)
u_centers = np.linspace(u[0], u[-1], n_centers)
centers = np.interp(u_centers, u, x)
return centers, density, y_smooth, dy, d2y
[ドキュメント]
def estimate_local_amplitude(
x,
y_smooth,
centers,
window_points=21,
p_low=10.0,
p_high=90.0,
floor_frac=1.0e-4,
):
"""
概要:
各中心周辺の局所的な振幅を推定します。
詳細説明:
バックグラウンドを明示的に使用せずに、平滑化されたyデータの局所パーセンタイル幅から
振幅を推定します。これにより、スペクトルに大きなオフセットやバックグラウンドがある場合でも、
振幅推定がよりロバストになります。各中心の周りのウィンドウ内で、p_lowとp_high
パーセンタイルの差の半分と、中心点の値と局所中央値の差の絶対値の大きい方を採用します。
推定された振幅には、グローバルスケールに基づく小さなフロアが適用されます。
引数:
:param x: x座標の配列。
:type x: numpy.ndarray
:param y_smooth: 平滑化されたy座標の配列。
:type y_smooth: numpy.ndarray
:param centers: ガウス基底の中心の配列。
:type centers: numpy.ndarray
:param window_points: 局所振幅推定に使用するウィンドウ内のデータ点数。
自動的に奇数に調整され、データ点数を超えないようにします。
:type window_points: int
:param p_low: 低い方のパーセンタイル値(例: 10.0)。
:type p_low: float
:param p_high: 高い方のパーセンタイル値(例: 90.0)。
:type p_high: float
:param floor_frac: 振幅の最小値フロアを決定するための、グローバルスケールに対する割合。
:type floor_frac: float
戻り値:
:returns: 各中心における推定振幅の配列。
:rtype: numpy.ndarray
"""
x = np.asarray(x, dtype=float)
y_smooth = np.asarray(y_smooth, dtype=float)
centers = np.asarray(centers, dtype=float)
n = len(x)
window_points = int(window_points)
if window_points < 3:
window_points = 3
if window_points % 2 == 0:
window_points += 1
if window_points > n:
window_points = n if n % 2 == 1 else n - 1
half = window_points // 2
global_scale = np.percentile(y_smooth, 95.0) - np.percentile(y_smooth, 5.0)
amp_floor = floor_frac * max(abs(global_scale), 1.0e-30)
amp = np.empty(len(centers), dtype=float)
for i, c in enumerate(centers):
idx = int(np.argmin(np.abs(x - c)))
i0 = max(0, idx - half)
i1 = min(n, idx + half + 1)
yw = y_smooth[i0:i1]
local_low = np.percentile(yw, p_low)
local_high = np.percentile(yw, p_high)
local_median = np.median(yw)
# local percentile half-width
amp_width = 0.5 * abs(local_high - local_low)
# deviation from local median
y_c = np.interp(c, x, y_smooth)
amp_dev = abs(y_c - local_median)
# use the larger local scale, but keep a small floor
amp[i] = max(amp_width, amp_dev, amp_floor)
return amp
[ドキュメント]
def estimate_variable_sigma(
centers,
alpha=1.6,
m=2,
sigma_min=None,
sigma_max=None,
method="hybrid_slope",
x=None,
y_smooth=None,
dy=None,
alpha_slope=1.0,
sigma_amp_window=21,
slope_eps=1.0e-6,
return_details=False,
):
"""
概要:
ガウス基底の幅(シグマ)を推定します。
詳細説明:
sigma_min と sigma_max を使って、推定されたシグマを必要に応じてクリップします。
method が distance の場合は、隣接する中心間隔の平均に alpha を掛けた値を使います。
method が nearest の場合は、左右の最近接中心との距離の小さい方に alpha を掛けた値を使います。
method が slope の場合は、局所振幅を一階導関数の絶対値と slope_eps の和で割り、alpha_slope を掛けた値を使います。
method が hybrid_slope の場合は、nearest と slope の推定値の小さい方を使います。
slope または hybrid_slope を使う場合は、x, y_smooth, dy が必要です。
引数:
:param centers: ガウス基底の中心の配列。
:type centers: numpy.ndarray
:param alpha: 距離ベースのシグマ推定に使用する乗数。
:type alpha: float
:param m: 距離ベースのシグマ推定で考慮する隣接中心の数。
:type m: int
:param sigma_min: シグマの最小許容値。指定しない場合は適用されません。
:type sigma_min: Optional[float]
:param sigma_max: シグマの最大許容値。指定しない場合は適用されません。
:type sigma_max: Optional[float]
:param method: シグマ推定方法。distance, nearest, slope, hybrid_slope を指定できます。
:type method: str
:param x: x座標の配列。slope または hybrid_slope の場合に必要です。
:type x: Optional[numpy.ndarray]
:param y_smooth: 平滑化されたy座標の配列。slope または hybrid_slope の場合に必要です。
:type y_smooth: Optional[numpy.ndarray]
:param dy: 平滑化されたyの一階導関数。slope または hybrid_slope の場合に必要です。
:type dy: Optional[numpy.ndarray]
:param alpha_slope: 傾きベースのシグマ推定に使用する乗数。
:type alpha_slope: float
:param sigma_amp_window: 傾きベースのシグマ推定で局所振幅を計算する際のウィンドウ点数。
:type sigma_amp_window: int
:param slope_eps: 傾きベースのシグマ推定で分母に追加される小さな値。
:type slope_eps: float
:param return_details: 詳細なシグマ候補を返すかどうか。
:type return_details: bool
戻り値:
:returns: 推定されたシグマの配列。return_details が True の場合はシグマ配列と詳細辞書のタプル。
:rtype: Union[numpy.ndarray, tuple[numpy.ndarray, dict[str, numpy.ndarray]]]
例外:
:raises ValueError: 中心が2つ未満の場合、必要なデータが不足している場合、または不明な推定方法が指定された場合。
"""
centers = np.asarray(centers, dtype=float)
n = len(centers)
if n < 2:
raise ValueError("At least two centers are required to estimate sigma.")
method = str(method).lower()
# Conventional distance estimate using +/- m neighbors.
sigma_dist = np.empty(n)
for j in range(n):
i0 = max(0, j - m)
i1 = min(n - 1, j + m)
if i1 == i0:
local_spacing = centers[1] - centers[0]
else:
local_spacing = (centers[i1] - centers[i0]) / (i1 - i0)
sigma_dist[j] = alpha * local_spacing
# Nearest-neighbor distance estimate. This is stricter for isolated bases.
sigma_nearest = np.empty(n)
for j in range(n):
if j == 0:
d = centers[1] - centers[0]
elif j == n - 1:
d = centers[-1] - centers[-2]
else:
d = min(centers[j] - centers[j - 1], centers[j + 1] - centers[j])
sigma_nearest[j] = alpha * d
# Slope estimate. Large values are allowed at flat regions and later rejected
# by min(distance, slope), sigma_max, and sigma-dependent Ridge penalty.
sigma_slope = np.full(n, np.nan, dtype=float)
if method in ("slope", "hybrid_slope"):
if x is None or y_smooth is None or dy is None:
raise ValueError("x, y_smooth, and dy are required for slope-based sigma.")
# y_c = np.interp(centers, x, y_smooth)
# dy_c = np.interp(centers, x, dy)
# sigma_slope = alpha_slope * np.abs(y_c) / (np.abs(dy_c) + slope_eps)
amp_c = estimate_local_amplitude(
x,
y_smooth,
centers,
window_points=sigma_amp_window,
)
dy_c = np.interp(centers, x, dy)
sigma_slope = alpha_slope * amp_c / (np.abs(dy_c) + sigma_slope_eps)
if method == "distance":
sigmas = sigma_dist.copy()
elif method == "nearest":
sigmas = sigma_nearest.copy()
elif method == "slope":
sigmas = sigma_slope.copy()
elif method == "hybrid_slope":
sigmas = np.minimum(sigma_nearest, sigma_slope)
else:
raise ValueError(
f"Unknown sigma method: {method}. "
"Use distance, nearest, slope, or hybrid_slope."
)
if sigma_min is not None:
sigmas = np.maximum(sigmas, sigma_min)
if sigma_max is not None:
sigmas = np.minimum(sigmas, sigma_max)
sigmas = np.maximum(sigmas, 1.0e-30)
details = {
"sigma_dist": sigma_dist,
"sigma_nearest": sigma_nearest,
"sigma_slope": sigma_slope,
"sigma_raw": sigmas.copy(),
}
if return_details:
return sigmas, details
return sigmas
# =============================================================================
# Weighted Ridge
# =============================================================================
[ドキュメント]
@dataclass
class WeightedRidgeResult:
"""
概要:
重み付きリッジ回帰の結果を格納するデータクラス。
属性:
:param coef_value: 推定された回帰係数。
:type coef_value: numpy.ndarray
:param penalty_diag_value: 対角化されたペナルティ行列の対角要素。
:type penalty_diag_value: numpy.ndarray
"""
coef_: np.ndarray
penalty_diag_: np.ndarray
[ドキュメント]
def predict(self, xmat):
"""
概要:
入力デザイン行列 xmat を使用して予測値を計算します。
引数:
:param xmat: デザイン行列。
:type xmat: numpy.ndarray
戻り値:
:returns: 予測されたy値。
:rtype: numpy.ndarray
"""
return xmat @ self.coef_
[ドキュメント]
def make_penalty_diag(
n_bg,
sigmas,
lambda_bg=1.0e-8,
lambda_gauss=1.0e-3,
sigma_penalty_power=2.0,
):
"""
概要:
重み付きリッジ回帰のペナルティ行列の対角要素を生成します。
詳細説明:
この関数は、バックグラウンド項とガウス基底項に異なるリッジペナルティを適用します。
ガウス基底のペナルティは、その標準偏差(sigmas)の大きさに応じて重み付けされます。
sigma_penalty_powerが大きいほど、幅の広いガウス基底にはより強いペナルティが課されます。
引数:
:param n_bg: バックグラウンド項の数。
:type n_bg: int
:param sigmas: ガウス基底の標準偏差の配列。
:type sigmas: numpy.ndarray
:param lambda_bg: バックグラウンド項のリッジペナルティ係数。
:type lambda_bg: float
:param lambda_gauss: ガウス基底項のリッジペナルティ係数。
:type lambda_gauss: float
:param sigma_penalty_power: シグマのペナルティ重み付けのべき乗。
例えば2.0の場合、ペナルティはシグマの二乗に比例します。
:type sigma_penalty_power: float
戻り値:
:returns: ペナルティ行列の対角要素の配列。
:rtype: numpy.ndarray
"""
sigmas = np.asarray(sigmas, dtype=float)
n_g = len(sigmas)
p = n_bg + n_g
penalty = np.zeros(p, dtype=float)
if n_bg > 0:
penalty[:n_bg] = lambda_bg
if n_g > 0:
sigma_ref = np.median(sigmas)
sigma_ref = max(sigma_ref, 1.0e-30)
scale = (sigmas / sigma_ref) ** sigma_penalty_power
penalty[n_bg:] = lambda_gauss * scale
return penalty
[ドキュメント]
def fit_weighted_ridge(
x,
y,
centers,
sigmas,
bg_order,
xmin,
xmax,
lambda_bg=1.0e-8,
lambda_gauss=1.0e-3,
sigma_penalty_power=2.0,
):
"""
概要:
重み付きリッジ回帰モデルをデータに適合させます。
詳細説明:
入力データ x, y、ガウス基底の centers と sigmas、
およびバックグラウンドの次数 bg_order を使用してデザイン行列を構築します。
make_penalty_diag を用いてペナルティ行列の対角要素を生成し、
以下のリッジ回帰の正規方程式を解きます。
(X^T X + diag(penalty)) @ coef = X^T @ y
線形代数ソルバー np.linalg.solve が失敗した場合は、
np.linalg.lstsq にフォールバックします。
引数:
:param x: x座標の配列。
:type x: numpy.ndarray
:param y: y座標の配列。
:type y: numpy.ndarray
:param centers: ガウス基底の中心の配列。
:type centers: numpy.ndarray
:param sigmas: ガウス基底の標準偏差の配列。
:type sigmas: numpy.ndarray
:param bg_order: チェビシェフバックグラウンドの次数。
:type bg_order: int
:param xmin: x範囲の最小値。
:type xmin: float
:param xmax: x範囲の最大値。
:type xmax: float
:param lambda_bg: バックグラウンド項のリッジペナルティ係数。
:type lambda_bg: float
:param lambda_gauss: ガウス基底項のリッジペナルティ係数。
:type lambda_gauss: float
:param sigma_penalty_power: シグマのペナルティ重み付けのべき乗。
:type sigma_penalty_power: float
戻り値:
:returns: 適合されたモデルの結果オブジェクト。
:rtype: WeightedRidgeResult
"""
xmat = design_matrix(x, centers, sigmas, bg_order, xmin, xmax)
n_bg = bg_order + 1 if bg_order >= 0 else 0
penalty_diag = make_penalty_diag(
n_bg,
sigmas,
lambda_bg=lambda_bg,
lambda_gauss=lambda_gauss,
sigma_penalty_power=sigma_penalty_power,
)
amat = xmat.T @ xmat + np.diag(penalty_diag)
bvec = xmat.T @ y
try:
coef = np.linalg.solve(amat, bvec)
except np.linalg.LinAlgError:
print("WARNING: solve failed. Falling back to np.linalg.lstsq.")
coef = np.linalg.lstsq(amat, bvec, rcond=None)[0]
return WeightedRidgeResult(coef, penalty_diag)
[ドキュメント]
def predict(model, x, centers, sigmas, bg_order, xmin, xmax):
"""
概要:
適合済みモデルを使用して、指定されたx値での予測値を計算します。
詳細説明:
新しいx値に対してデザイン行列を再構築し、WeightedRidgeResult オブジェクトの
predict メソッドを呼び出して予測されたy値を計算します。
引数:
:param model: 適合済みモデルオブジェクト。
:type model: WeightedRidgeResult
:param x: 予測を行うx座標の配列。
:type x: numpy.ndarray
:param centers: ガウス基底の中心の配列。
:type centers: numpy.ndarray
:param sigmas: ガウス基底の標準偏差の配列。
:type sigmas: numpy.ndarray
:param bg_order: チェビシェフバックグラウンドの次数。
:type bg_order: int
:param xmin: x範囲の最小値。
:type xmin: float
:param xmax: x範囲の最大値。
:type xmax: float
戻り値:
:returns: 予測されたy値の配列。
:rtype: numpy.ndarray
"""
xmat = design_matrix(x, centers, sigmas, bg_order, xmin, xmax)
return model.predict(xmat)
[ドキュメント]
def predict_background(model, x, bg_order, xmin, xmax):
"""
概要:
適合済みモデルからバックグラウンド成分のみを予測します。
詳細説明:
バックグラウンドのデザイン行列のみを生成し、モデルの係数から
バックグラウンドに対応する部分を取り出して予測値を計算します。
バックグラウンドの次数が-1の場合(バックグラウンドなし)、
ゼロの配列を返します。
引数:
:param model: 適合済みモデルオブジェクト。
:type model: WeightedRidgeResult
:param x: 予測を行うx座標の配列。
:type x: numpy.ndarray
:param bg_order: チェビシェフバックグラウンドの次数。
:type bg_order: int
:param xmin: x範囲の最小値。
:type xmin: float
:param xmax: x範囲の最大値。
:type xmax: float
戻り値:
:returns: 予測されたバックグラウンド値の配列。
:rtype: numpy.ndarray
"""
phi_bg = background_design_matrix(x, bg_order, xmin, xmax)
n_bg = bg_order + 1 if bg_order >= 0 else 0
if n_bg <= 0:
return np.zeros(len(x), dtype=float)
coef_bg = model.coef_[:n_bg]
return phi_bg @ coef_bg
[ドキュメント]
def calc_metrics(y_ref, y_pred):
"""
概要:
参照値と予測値間の平均絶対誤差 (MAE)、二乗平均平方根誤差 (RMSE)、
および残差平方和 (RSS) を計算します。
引数:
:param y_ref: 参照y値の配列。
:type y_ref: numpy.ndarray
:param y_pred: 予測y値の配列。
:type y_pred: numpy.ndarray
戻り値:
:returns: MAE、RMSE、RSSのタプル。
:rtype: tuple[float, float, float]
"""
mae = np.mean(np.abs(y_pred - y_ref))
rmse = np.sqrt(np.mean((y_pred - y_ref) ** 2))
rss = np.sum((y_pred - y_ref) ** 2)
return mae, rmse, rss
[ドキュメント]
def ridge_effective_df(
x,
centers,
sigmas,
bg_order,
xmin,
xmax,
lambda_bg,
lambda_gauss,
sigma_penalty_power,
):
"""
概要:
リッジ回帰における実効自由度(Effective Degrees of Freedom, EDF)を計算します。
詳細説明:
EDFは、正則化されたモデルの複雑さの尺度であり、以下の式で定義されます。
Tr(X (X^T X + diag(penalty))^-1 X^T)
ただし、Xはデザイン行列、diag(penalty)はペナルティ行列の対角要素です。
線形代数ソルバーが失敗した場合はNaNを返します。
引数:
:param x: x座標の配列。
:type x: numpy.ndarray
:param centers: ガウス基底の中心の配列。
:type centers: numpy.ndarray
:param sigmas: ガウス基底の標準偏差の配列。
:type sigmas: numpy.ndarray
:param bg_order: チェビシェフバックグラウンドの次数。
:type bg_order: int
:param xmin: x範囲の最小値。
:type xmin: float
:param xmax: x範囲の最大値。
:type xmax: float
:param lambda_bg: バックグラウンド項のリッジペナルティ係数。
:type lambda_bg: float
:param lambda_gauss: ガウス基底項のリッジペナルティ係数。
:type lambda_gauss: float
:param sigma_penalty_power: シグマのペナルティ重み付けのべき乗。
:type sigma_penalty_power: float
戻り値:
:returns: 実効自由度、または計算失敗時はNaN。
:rtype: float
"""
xmat = design_matrix(x, centers, sigmas, bg_order, xmin, xmax)
n_bg = bg_order + 1 if bg_order >= 0 else 0
penalty_diag = make_penalty_diag(
n_bg,
sigmas,
lambda_bg=lambda_bg,
lambda_gauss=lambda_gauss,
sigma_penalty_power=sigma_penalty_power,
)
amat = xmat.T @ xmat + np.diag(penalty_diag)
try:
return np.trace(xmat @ np.linalg.solve(amat, xmat.T))
except np.linalg.LinAlgError:
return np.nan
[ドキュメント]
def calc_aic_bic(y_ref, y_pred, df):
"""
概要:
赤池情報量基準(AIC)とベイズ情報量基準(BIC)を計算します。
詳細説明:
モデルの適合度と複雑さのバランスを評価するための指標です。
残差平方和(RSS)と実効自由度(df)を使用して計算されます。
RSSが非常に小さい値(ゼロに近い)の場合、数値的な安定性を確保するために下限が適用されます。
引数:
:param y_ref: 参照y値の配列。
:type y_ref: numpy.ndarray
:param y_pred: 予測y値の配列。
:type y_pred: numpy.ndarray
:param df: モデルの実効自由度。
:type df: float
戻り値:
:returns: AICとBICのタプル。
:rtype: tuple[float, float]
"""
n = len(y_ref)
rss = np.sum((y_pred - y_ref) ** 2)
rss = max(rss, 1.0e-300)
aic = n * np.log(rss / n) + 2.0 * df
bic = n * np.log(rss / n) + np.log(n) * df
return aic, bic
# =============================================================================
# Pruning
# =============================================================================
[ドキュメント]
def prune_basis_by_ridge_weight(
x,
y,
centers0,
sigmas0,
bg_order,
xmin,
xmax,
lambda_bg=1.0e-8,
lambda_gauss=1.0e-3,
sigma_penalty_power=2.0,
prune_frac=0.10,
min_basis=5,
metric="mae",
allowed_ratio=1.02,
):
"""
概要:
リッジ回帰の係数に基づいて弱いガウス基底を段階的に除去(プルーニング)します。
詳細説明:
この関数は、与えられた初期のガウス基底のセットから、
重み付きリッジ回帰の係数の絶対値が小さい(寄与が弱い)基底を
段階的に除去していくプロセスを実行します。
各プルーニングステップでは、モデルを再適合させ、MAE、RMSE、AIC、BIC、実効自由度などを計算し記録します。
このプロセスは、残りのガウス基底の数が min_basis に達するまで繰り返されます。
最終的に、指定された metric(例: MAE)と allowed_ratio に基づいて最適なモデルが選択されます。
allowed_ratio は、最良のメトリック値からどれだけ乖離しても許容するかを指定します。
(例: 1.02 は最良値の102%以内であれば許容)。
引数:
:param x: x座標の配列。
:type x: numpy.ndarray
:param y: y座標の配列。
:type y: numpy.ndarray
:param centers0: 初期ガウス基底の中心の配列。
:type centers0: numpy.ndarray
:param sigmas0: 初期ガウス基底の標準偏差の配列。
:type sigmas0: numpy.ndarray
:param bg_order: チェビシェフバックグラウンドの次数。
:type bg_order: int
:param xmin: x範囲の最小値。
:type xmin: float
:param xmax: x範囲の最大値。
:type xmax: float
:param lambda_bg: バックグラウンド項のリッジペナルティ係数。
:type lambda_bg: float
:param lambda_gauss: ガウス基底項のリッジペナルティ係数。
:type lambda_gauss: float
:param sigma_penalty_power: シグマのペナルティ重み付けのべき乗。
:type sigma_penalty_power: float
:param prune_frac: 各ステップで除去するガウス基底の割合(0.0から1.0)。
:type prune_frac: float
:param min_basis: 維持するガウス基底の最小数。
:type min_basis: int
:param metric: 最適モデル選択に使用する評価指標 ("mae" または "rmse")。
:type metric: str
:param allowed_ratio: 最良の指標値に対する許容誤差の比率。
:type allowed_ratio: float
戻り値:
:returns: プルーニングステップごとの結果のリスト (results) と、選択された最適なモデルの結果 (selected) のタプル。
:rtype: tuple[list[dict], dict]
"""
centers = np.asarray(centers0, dtype=float).copy()
sigmas = np.asarray(sigmas0, dtype=float).copy()
n_bg = bg_order + 1 if bg_order >= 0 else 0
results = []
step = 0
while len(centers) >= min_basis:
model = fit_weighted_ridge(
x,
y,
centers,
sigmas,
bg_order,
xmin,
xmax,
lambda_bg=lambda_bg,
lambda_gauss=lambda_gauss,
sigma_penalty_power=sigma_penalty_power,
)
y_pred = predict(model, x, centers, sigmas, bg_order, xmin, xmax)
mae, rmse, rss = calc_metrics(y, y_pred)
df = ridge_effective_df(
x,
centers,
sigmas,
bg_order,
xmin,
xmax,
lambda_bg=lambda_bg,
lambda_gauss=lambda_gauss,
sigma_penalty_power=sigma_penalty_power,
)
aic, bic = calc_aic_bic(y, y_pred, df)
results.append(
{
"step": step,
"n_basis": len(centers),
"mae": mae,
"rmse": rmse,
"rss": rss,
"df": df,
"aic": aic,
"bic": bic,
"centers": centers.copy(),
"sigmas": sigmas.copy(),
"model": model,
}
)
if len(centers) == min_basis:
break
gaussian_weights = np.abs(model.coef_[n_bg:])
n_remove = max(1, int(np.ceil(prune_frac * len(centers))))
n_remove = min(n_remove, len(centers) - min_basis)
remove_idx = np.argsort(gaussian_weights)[:n_remove]
keep = np.ones(len(centers), dtype=bool)
keep[remove_idx] = False
centers = centers[keep]
sigmas = sigmas[keep]
step += 1
key = metric.lower()
best_value = min(r[key] for r in results)
candidates = [r for r in results if r[key] <= allowed_ratio * best_value]
selected = min(candidates, key=lambda r: r["n_basis"])
return results, selected
[ドキュメント]
def print_prune_results(results, selected, metric):
"""
概要:
プルーニングの履歴と選択された結果をコンソールに出力します。
詳細説明:
各プルーニングステップでのガウス基底の数、MAE、RMSE、実効自由度、AIC、BICを表示します。
また、metric に基づいて選択された最適なモデルには「<== selected」マークを付けます。
引数:
:param results: プルーニングステップごとの結果のリスト。
:type results: List[Dict]
:param selected: 選択された最適なモデルの結果。
:type selected: Dict
:param metric: 最適モデル選択に使用された評価指標。
:type metric: str
戻り値:
:returns: なし
:rtype: None
"""
print()
print("===== Pruning history =====")
print(" step n_gauss MAE RMSE df AIC BIC")
for r in results:
mark = " <== selected" if r is selected else ""
print(
f"{r['step']:5d} {r['n_basis']:7d} "
f"{r['mae']:10.6g} {r['rmse']:10.6g} "
f"{r['df']:10.4f} {r['aic']:11.4f} {r['bic']:11.4f}"
f"{mark}"
)
print()
print(f"Selection metric = {metric}")
print(f"Selected n_gauss = {selected['n_basis']}")
print(f"Selected MAE = {selected['mae']:.6g}")
print(f"Selected RMSE = {selected['rmse']:.6g}")
# =============================================================================
# Output
# =============================================================================
[ドキュメント]
def build_output_tables(
args,
source,
x,
y,
df_input,
centers0,
sigmas0,
density,
y_smooth,
dy,
d2y,
centers,
sigmas,
model,
xmin,
xmax,
prune_results,
):
"""
概要:
計算結果を格納するためのPandas DataFrameの辞書を構築します。
詳細説明:
以下の情報を含むDataFrameを生成し、Excelの各シートに対応する辞書として返します。
- 入力データ (InputData)
- データとフィット結果 (DataAndFit)
- フィットされたスペクトル (FitSpectrum)
- 個々のガウス成分 (GaussianComponents)
- バックグラウンド係数 (BackgroundCoef)
- 初期ガウス基底の情報 (InitialGaussianBasis)
- 選択されたガウス基底の情報 (SelectedGaussianBasis)
- プルーニング履歴 (PruneHistory、プルーニングモードの場合のみ)
- 計算の概要と設定 (Summary)
フィットスペクトルは、args.n_fit の点数でx範囲にわたって計算されます。
デモデータの場合、真の関数と比較する列も追加されます。
引数:
:param args: コマンドライン引数。
:type args: argparse.Namespace
:param source: データソースを示す文字列(ファイル名または「demo」)。
:type source: str
:param x: 元の入力xデータ。
:type x: numpy.ndarray
:param y: 元の入力yデータ。
:type y: numpy.ndarray
:param df_input: 元の入力データとそのソース列情報を含むDataFrame。
:type df_input: pandas.DataFrame
:param centers0: 初期ガウス基底の中心の配列。
:type centers0: numpy.ndarray
:param sigmas0: 初期ガウス基底の標準偏差の配列。
:type sigmas0: numpy.ndarray
:param density: 適応型中心計算に使用された密度プロファイル。
:type density: numpy.ndarray
:param y_smooth: Savitzky-Golayフィルターで平滑化されたyデータ。
:type y_smooth: numpy.ndarray
:param dy: 平滑化されたyの1階導関数。
:type dy: numpy.ndarray
:param d2y: 平滑化されたyの2階導関数。
:type d2y: numpy.ndarray
:param centers: 最終的に選択されたガウス基底の中心の配列。
:type centers: numpy.ndarray
:param sigmas: 最終的に選択されたガウス基底の標準偏差の配列。
:type sigmas: numpy.ndarray
:param model: 最終的に適合されたモデルオブジェクト。
:type model: WeightedRidgeResult
:param xmin: x範囲の最小値。
:type xmin: float
:param xmax: x範囲の最大値。
:type xmax: float
:param prune_results: プルーニング結果の履歴のリスト(プルーニングモードの場合のみ)。
:type prune_results: Optional[List[Dict]]
戻り値:
:returns: Excelのシート名と対応するDataFrameの辞書。
:rtype: Dict[str, pandas.DataFrame]
"""
x_fit = np.linspace(x.min(), x.max(), args.n_fit)
y_fit = predict(model, x_fit, centers, sigmas, args.bg_order, xmin, xmax)
y_bg_fit = predict_background(model, x_fit, args.bg_order, xmin, xmax)
y_gauss_fit = y_fit - y_bg_fit
# Components on input x as well.
y_pred_input = predict(model, x, centers, sigmas, args.bg_order, xmin, xmax)
y_bg_input = predict_background(model, x, args.bg_order, xmin, xmax)
y_gauss_input = y_pred_input - y_bg_input
mae_input, rmse_input, rss_input = calc_metrics(y, y_pred_input)
df_data = pd.DataFrame({
"x": x,
"y_input": y,
"y_smooth_for_density": y_smooth,
"dy_smooth": dy,
"d2y_smooth": d2y,
"basis_density": density,
"y_fit_at_input_x": y_pred_input,
"background_at_input_x": y_bg_input,
"gaussian_sum_at_input_x": y_gauss_input,
"residual": y - y_pred_input,
})
df_fit = pd.DataFrame({
"x": x_fit,
"y_fit": y_fit,
"background": y_bg_fit,
"gaussian_sum": y_gauss_fit,
})
if str(args.infile).strip().lower() == "demo":
df_fit["true_function_demo"] = true_function(x_fit)
df_fit["true_background_demo"] = true_background(x_fit)
df_fit["true_gaussian_sum_demo"] = true_peaks(x_fit)
n_bg = args.bg_order + 1 if args.bg_order >= 0 else 0
coef_g_for_components = model.coef_[n_bg:]
phi_g_fit = gaussian_design_matrix(x_fit, centers, sigmas)
df_components = pd.DataFrame({"x": x_fit})
for i in range(len(centers)):
df_components[f"G{i:03d}"] = phi_g_fit[:, i] * coef_g_for_components[i]
coef_bg = model.coef_[:n_bg]
coef_g = model.coef_[n_bg:]
penalty_bg = model.penalty_diag_[:n_bg]
penalty_g = model.penalty_diag_[n_bg:]
df_bg_coef = pd.DataFrame({
"component": [f"Chebyshev_T{i}" for i in range(n_bg)],
"coef": coef_bg,
"penalty": penalty_bg,
})
# Initial vs selected Gaussian bases.
selected_set = set(np.round(centers, 14))
# Sigma diagnostic columns for the initial basis. These are useful for
# checking whether the final sigma was limited by the nearest-center rule,
# by the slope rule, or by sigma_min/sigma_max.
try:
_, sigma_details = estimate_variable_sigma(
centers0,
alpha=args.sigma_alpha,
sigma_amp_window=args.sigma_amp_window,
m=args.sigma_m,
sigma_min=args.sigma_min,
sigma_max=args.sigma_max,
method=args.sigma_method,
x=x,
y_smooth=y_smooth,
dy=dy,
alpha_slope=args.sigma_alpha_slope,
slope_eps=args.sigma_slope_eps,
return_details=True,
)
except Exception:
sigma_details = {
"sigma_dist": np.full(len(centers0), np.nan),
"sigma_nearest": np.full(len(centers0), np.nan),
"sigma_slope": np.full(len(centers0), np.nan),
}
df_basis_initial = pd.DataFrame({
"basis_index_initial": np.arange(len(centers0)),
"center_initial": centers0,
"sigma_initial": sigmas0,
"sigma_dist_pm_m": sigma_details["sigma_dist"],
"sigma_nearest": sigma_details["sigma_nearest"],
"sigma_slope": sigma_details["sigma_slope"],
"selected_by_center_match": [bool(np.round(c, 14) in selected_set) for c in centers0],
})
df_basis_selected = pd.DataFrame({
"basis_index_selected": np.arange(len(centers)),
"center": centers,
"sigma": sigmas,
"coef": coef_g,
"abs_coef": np.abs(coef_g),
"penalty": penalty_g,
"area_like_coef_sigma_sqrt2pi": coef_g * sigmas * np.sqrt(2.0 * np.pi),
})
df_prune = None
if prune_results is not None:
df_prune = pd.DataFrame([
{
"step": r["step"],
"n_gaussian_basis": r["n_basis"],
"mae": r["mae"],
"rmse": r["rmse"],
"rss": r["rss"],
"effective_df": r["df"],
"aic": r["aic"],
"bic": r["bic"],
"selected": len(r["centers"]) == len(centers),
}
for r in prune_results
])
df_summary = pd.DataFrame({
"key": [
"source",
"mode",
"bg_order",
"n_input_data",
"n_centers_initial",
"n_gaussian_selected",
"lambda_bg",
"lambda_gauss",
"sigma_penalty_power",
"eps",
"w_grad",
"w_curv",
"sigma_method",
"sigma_alpha",
"sigma_m",
"sigma_alpha_slope",
"sigma_slope_eps",
"sigma_min",
"sigma_max",
"smooth_window",
"smooth_order",
"prune_frac",
"min_basis",
"metric",
"allowed_ratio",
"mae_input",
"rmse_input",
"rss_input",
],
"value": [
source,
args.mode,
args.bg_order,
len(x),
len(centers0),
len(centers),
args.lambda_bg,
args.lambda_gauss,
args.sigma_penalty_power,
args.eps,
args.w_grad,
args.w_curv,
args.sigma_method,
args.sigma_alpha,
args.sigma_m,
args.sigma_alpha_slope,
args.sigma_slope_eps,
args.sigma_min,
args.sigma_max,
args.smooth_window,
args.smooth_order,
args.prune_frac,
args.min_basis,
args.metric,
args.allowed_ratio,
mae_input,
rmse_input,
rss_input,
],
})
return {
"InputData": df_input,
"DataAndFit": df_data,
"FitSpectrum": df_fit,
"GaussianComponents": df_components,
"BackgroundCoef": df_bg_coef,
"InitialGaussianBasis": df_basis_initial,
"SelectedGaussianBasis": df_basis_selected,
"PruneHistory": df_prune,
"Summary": df_summary,
}
[ドキュメント]
def write_excel_output(outfile, tables):
"""
概要:
計算結果のDataFrameをExcelファイルに書き出します。
詳細説明:
tables辞書内の各DataFrameを、辞書のキーをシート名としてExcelファイルに書き込みます。
書き込み後、openpyxlを使用して各シートの列幅を自動調整し、
最初の行(ヘッダー)を固定(フリーズ)して可読性を高めます。
必要なパッケージ(pandas, openpyxl)が見つからない場合は、エラーメッセージを表示し、
インストールを促します。
引数:
:param outfile: 出力Excelファイルのパス。
:type outfile: str
:param tables: Excelのシート名と対応するDataFrameの辞書。
:type tables: Dict[str, pandas.DataFrame]
戻り値:
:returns: なし
:rtype: None
"""
try:
with pd.ExcelWriter(outfile, engine="openpyxl") as writer:
for sheet_name, df in tables.items():
if df is None:
continue
df.to_excel(writer, sheet_name=sheet_name, index=False)
# Simple readable widths.
ws = writer.book[sheet_name]
for col_cells in ws.columns:
col_letter = col_cells[0].column_letter
max_len = 0
for cell in col_cells[:200]:
if cell.value is not None:
max_len = max(max_len, len(str(cell.value)))
ws.column_dimensions[col_letter].width = min(max(max_len + 2, 10), 32)
ws.freeze_panes = "A2"
except Exception as exc:
safe_import_message(exc, "pandas openpyxl")
[ドキュメント]
def plot_results(args, x, y, centers0, sigmas0, density, y_smooth, centers, sigmas,
model, xmin, xmax, prune_results):
"""
概要:
適合結果と診断プロットを生成し、表示または保存します。
詳細説明:
この関数は、診断、メインフィット、分解の3種類のMatplotlibウィンドウを生成します。
診断ウィンドウには、生データ、平滑化データ、基底密度、シグマ候補、選択された中心、
さらにプルーニング時の誤差推移またはフィット時の残差を表示します。
メインフィットウィンドウには、生データ、バックグラウンド、ガウス成分の合計、
全体フィットを表示します。デモデータの場合は、真の関数成分も表示します。
分解ウィンドウには、生データ、全体フィット、ガウス成分の合計、個々のガウス成分を表示します。
プロットは args.show に基づいて表示されるか、args.save_plot に基づいて指定されたファイルに保存されます。
引数:
:param args: コマンドライン引数。
:type args: argparse.Namespace
:param x: 元の入力xデータ。
:type x: numpy.ndarray
:param y: 元の入力yデータ。
:type y: numpy.ndarray
:param centers0: 初期ガウス基底の中心の配列。
:type centers0: numpy.ndarray
:param sigmas0: 初期ガウス基底の標準偏差の配列。
:type sigmas0: numpy.ndarray
:param density: 適応型中心計算に使用された密度プロファイル。
:type density: numpy.ndarray
:param y_smooth: Savitzky-Golayフィルターで平滑化されたyデータ。
:type y_smooth: numpy.ndarray
:param centers: 最終的に選択されたガウス基底の中心の配列。
:type centers: numpy.ndarray
:param sigmas: 最終的に選択されたガウス基底の標準偏差の配列。
:type sigmas: numpy.ndarray
:param model: 最終的に適合されたモデルオブジェクト。
:type model: WeightedRidgeResult
:param xmin: x範囲の最小値。
:type xmin: float
:param xmax: x範囲の最大値。
:type xmax: float
:param prune_results: プルーニング結果の履歴のリスト。
:type prune_results: Optional[List[Dict]]
戻り値:
:returns: なし。
:rtype: None
"""
x_fit = np.linspace(x.min(), x.max(), args.n_fit)
y_fit = predict(model, x_fit, centers, sigmas, args.bg_order, xmin, xmax)
y_bg_fit = predict_background(model, x_fit, args.bg_order, xmin, xmax)
y_gauss_fit = y_fit - y_bg_fit
n_bg = args.bg_order + 1 if args.bg_order >= 0 else 0
coef_g = model.coef_[n_bg:]
phi_g = gaussian_design_matrix(x_fit, centers, sigmas)
g_each = phi_g * coef_g[None, :]
# Recompute sigma diagnostic candidates for plotting, when possible.
try:
_, sigma_details = estimate_variable_sigma(
centers0,
alpha=args.sigma_alpha,
sigma_amp_window=args.sigma_amp_window,
m=args.sigma_m,
sigma_min=args.sigma_min,
sigma_max=args.sigma_max,
method=args.sigma_method,
x=x,
y_smooth=y_smooth,
dy=np.gradient(y_smooth, x),
alpha_slope=args.sigma_alpha_slope,
slope_eps=args.sigma_slope_eps,
return_details=True,
)
except Exception:
sigma_details = None
# -------------------------------------------------------------------------
# Window A: diagnostics
# -------------------------------------------------------------------------
nrows_a = 3
fig_a, axes = plt.subplots(nrows_a, 1, figsize=(8, 9), sharex=False)
fig_a.suptitle("Diagnostics: data, basis density, sigma, pruning history")
ax = axes[0]
ax.scatter(x, y, s=16, label="input data")
ax.plot(x, y_smooth, "-", label="smoothed for density/sigma")
ax.set_xlabel("x")
ax.set_ylabel("y")
ax.legend()
ax.grid(True)
ax = axes[1]
ax.plot(x, density, "-", label="basis density")
ax.set_xlabel("x")
ax.set_ylabel("density")
ax.grid(True)
ax2 = ax.twinx()
if sigma_details is not None:
ax2.plot(centers0, sigma_details["sigma_nearest"], ":", label="sigma nearest candidate")
if np.isfinite(sigma_details["sigma_slope"]).any():
ax2.plot(centers0, sigma_details["sigma_slope"], "--", label="sigma slope candidate")
ax2.plot(centers0, sigmas0, "o-", label="initial sigma")
ax2.plot(centers, sigmas, "s", label="selected sigma")
ax2.set_ylabel("sigma")
for c in centers:
ax.axvline(c, color="0.85", linewidth=0.6, zorder=0)
lines1, labels1 = ax.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax.legend(lines1 + lines2, labels1 + labels2, loc="best")
ax = axes[2]
if args.mode == "prune" and prune_results is not None:
n_basis = np.array([r["n_basis"] for r in prune_results])
mae = np.array([r["mae"] for r in prune_results])
rmse = np.array([r["rmse"] for r in prune_results])
ax.plot(n_basis, mae, "o-", label="MAE")
ax.plot(n_basis, rmse, "s-", label="RMSE")
ax.axvline(len(centers), linestyle="--", label="selected")
ax.invert_xaxis()
ax.set_xlabel("number of Gaussian basis")
ax.set_ylabel("error")
ax.legend()
else:
y_pred_input = predict(model, x, centers, sigmas, args.bg_order, xmin, xmax)
residual = y - y_pred_input
ax.plot(x, residual, ".-", label="residual")
ax.axhline(0.0, linestyle="--", linewidth=1)
ax.set_xlabel("x")
ax.set_ylabel("residual")
ax.legend()
ax.grid(True)
fig_a.tight_layout()
# -------------------------------------------------------------------------
# Window B: main fit
# -------------------------------------------------------------------------
fig_b, ax = plt.subplots(figsize=(9, 5))
fig_b.suptitle("Main fit: data, background, Gaussian sum, total fit")
ax.scatter(x, y, s=16, label="input data")
ax.plot(x_fit, y_bg_fit, ":", label="background")
ax.plot(x_fit, y_gauss_fit, "-", label="Gaussian sum")
ax.plot(x_fit, y_fit, "-", linewidth=2, label="total fit = background + Gaussian sum")
if str(args.infile).strip().lower() == "demo":
ax.plot(x_fit, true_function(x_fit), "--", label="true demo function")
ax.plot(x_fit, true_background(x_fit), "--", label="true demo background")
ax.plot(x_fit, true_peaks(x_fit), "--", label="true demo Gaussian sum")
ax.set_xlabel("x")
ax.set_ylabel("y")
ax.legend()
ax.grid(True)
fig_b.tight_layout()
# -------------------------------------------------------------------------
# Window C: decomposition
# -------------------------------------------------------------------------
fig_c, ax = plt.subplots(figsize=(9, 5))
fig_c.suptitle("Decomposition: individual Gaussian components Gi(x), no background")
ax.scatter(x, y, s=14, label="input data")
ax.plot(x_fit, y_fit, "-", linewidth=2, label="total fit")
ax.plot(x_fit, y_gauss_fit, "-", linewidth=2, label="Gaussian sum")
# Plot selected Gaussian components. Suppress individual labels to avoid
# unreadable legends when many bases remain.
for i in range(g_each.shape[1]):
ax.plot(x_fit, g_each[:, i], "--", linewidth=1, alpha=0.55)
ax.axhline(0.0, linestyle=":", linewidth=1)
ax.set_xlabel("x")
ax.set_ylabel("y or component amplitude")
ax.legend()
ax.grid(True)
fig_c.tight_layout()
if args.save_plot:
base, ext = os.path.splitext(args.plotfile)
if ext == "":
ext = ".png"
files = [
f"{base}_diagnostics{ext}",
f"{base}_mainfit{ext}",
f"{base}_decomposition{ext}",
]
for fig, filename in zip([fig_a, fig_b, fig_c], files):
fig.savefig(filename, dpi=200)
print(f"Saved plot: {filename}")
if args.show:
plt.show(block=False)
plt.pause(0.1)
input("Press ENTER to close figures>>\n")
plt.close("all")
else:
plt.close(fig_a)
plt.close(fig_b)
plt.close(fig_c)
# =============================================================================
# Main
# =============================================================================
[ドキュメント]
def main():
"""
概要:
メイン実行関数。コマンドライン引数の解析から結果の出力まで、プログラムの全処理を制御します。
詳細説明:
1. argparse を使用して、入力/出力ファイル、モデルパラメータ、プルーニング設定、
プロットオプションなどのコマンドライン引数を定義および解析します。
2. read_input_data を呼び出して、Excelファイルまたはデモデータからx-yデータを読み込みます。
3. adaptive_centers_from_derivatives と estimate_variable_sigma を使用して、
適応的なガウス基底の中心と標準偏差を初期化します。
4. args.mode に応じて、モデルの適合 (fit_weighted_ridge) または
プルーニング (prune_basis_by_ridge_weight) を実行します。
5. 最終的な適合結果、モデルパラメータ、および評価指標をコンソールに出力します。
6. build_output_tables を使用して、結果のDataFrameの辞書を構築します。
7. write_excel_output を使用して、すべての結果をExcelファイルに書き込みます。
8. plot_results を使用して、適合結果と診断プロットを表示または保存します。
引数:
:param なし: この関数は引数を直接受け取りません。すべての設定はコマンドライン引数 sys.argv から解析されます。
:type なし: None
戻り値:
:returns: なし
:rtype: None
例外:
Exception: 予期せぬエラーが発生した場合、エラーメッセージとトレースバックを表示し、プログラムを終了します。
"""
parser = argparse.ArgumentParser(
description="Adaptive Gaussian basis + background + weighted Ridge + pruning for spectra"
)
parser.add_argument("--infile", type=str, default="demo",
help="Input Excel file. Use 'demo' for generated data.")
parser.add_argument("--sheet", type=str, default=0,
help="Excel sheet name or index. Default: 0")
parser.add_argument("--xcol", type=str, default=None,
help="x column name or zero-based column index. Default: first column")
parser.add_argument("--ycol", type=str, default=None,
help="y column name or zero-based column index. Default: second column")
parser.add_argument("--outfile", type=str, default=None,
help="Output Excel file. Default: created from infile name.")
parser.add_argument("--mode", type=str, default="prune", choices=["fit", "prune"])
parser.add_argument("--n-data", type=int, default=180,
help="Number of generated data points for --infile demo.")
parser.add_argument("--n-fit", type=int, default=1000,
help="Number of points in exported fitted spectra.")
parser.add_argument("--n-centers", type=int, default=60)
parser.add_argument("--noise", type=float, default=0.06)
parser.add_argument("--seed", type=int, default=1)
parser.add_argument("--bg-order", type=int, default=3,
help="Chebyshev background order. Use -1 for no background.")
parser.add_argument("--lambda-bg", type=float, default=1.0e-8)
parser.add_argument("--lambda-gauss", type=float, default=1.0e-3)
parser.add_argument("--sigma-penalty-power", type=float, default=2.0)
parser.add_argument("--eps", type=float, default=0.15)
parser.add_argument("--w-grad", type=float, default=0.3)
parser.add_argument("--w-curv", type=float, default=1.0)
parser.add_argument("--sigma-method", type=str, default="nearest",
# parser.add_argument("--sigma-method", type=str, default="hybrid_slope",
choices=["distance", "nearest", "slope", "hybrid_slope"],
help="Sigma estimation method. hybrid_slope uses min(nearest-distance, slope estimate).")
parser.add_argument("--sigma-alpha", type=float, default=1.6,
help="Distance-based sigma multiplier.")
parser.add_argument("--sigma-m", type=int, default=2,
help="Neighbor count for conventional distance sigma.")
parser.add_argument("--sigma-alpha-slope", type=float, default=1.0,
help="Slope-based sigma multiplier.")
parser.add_argument(
"--sigma-amp-window",
type=int,
default=21,
help="Window points for local amplitude estimation used in slope-based sigma.",
)
parser.add_argument("--sigma-slope-eps", type=float, default=1.0e-6,
help="Small denominator for |y|/(|dy/dx|+eps) sigma estimate.")
parser.add_argument("--sigma-min", type=float, default=None)
parser.add_argument("--sigma-max", type=float, default=None)
parser.add_argument("--smooth-window", type=int, default=21)
parser.add_argument("--smooth-order", type=int, default=3)
parser.add_argument("--prune-frac", type=float, default=0.10)
parser.add_argument("--min-basis", type=int, default=5)
parser.add_argument("--metric", type=str, default="mae", choices=["mae", "rmse"])
parser.add_argument("--allowed-ratio", type=float, default=1.02)
parser.add_argument("--show", type=int, default=1, choices=[0, 1])
parser.add_argument("--save-plot", type=int, default=0, choices=[0, 1])
parser.add_argument("--plotfile", type=str, default="adaptive_gaussian_ridge_plot.png")
args = parser.parse_args()
args.show = bool(args.show)
args.save_plot = bool(args.save_plot)
if args.outfile is None:
args.outfile = make_default_outfile(args.infile)
# Make output path relative to current directory unless user gives directory.
if os.path.dirname(args.outfile):
os.makedirs(os.path.dirname(args.outfile), exist_ok=True)
x, y, df_input, source = read_input_data(args)
xmin = np.min(x)
xmax = np.max(x)
centers0, density, y_smooth, dy, d2y = adaptive_centers_from_derivatives(
x,
y,
n_centers=args.n_centers,
eps=args.eps,
w_grad=args.w_grad,
w_curv=args.w_curv,
smooth_window=args.smooth_window,
smooth_order=args.smooth_order,
)
sigmas0 = estimate_variable_sigma(
centers0,
alpha=args.sigma_alpha,
sigma_amp_window=args.sigma_amp_window,
m=args.sigma_m,
sigma_min=args.sigma_min,
sigma_max=args.sigma_max,
method=args.sigma_method,
x=x,
y_smooth=y_smooth,
dy=dy,
alpha_slope=args.sigma_alpha_slope,
slope_eps=args.sigma_slope_eps,
)
if args.mode == "fit":
model = fit_weighted_ridge(
x,
y,
centers0,
sigmas0,
args.bg_order,
xmin,
xmax,
lambda_bg=args.lambda_bg,
lambda_gauss=args.lambda_gauss,
sigma_penalty_power=args.sigma_penalty_power,
)
centers = centers0
sigmas = sigmas0
prune_results = None
else:
prune_results, selected = prune_basis_by_ridge_weight(
x,
y,
centers0,
sigmas0,
args.bg_order,
xmin,
xmax,
lambda_bg=args.lambda_bg,
lambda_gauss=args.lambda_gauss,
sigma_penalty_power=args.sigma_penalty_power,
prune_frac=args.prune_frac,
min_basis=args.min_basis,
metric=args.metric,
allowed_ratio=args.allowed_ratio,
)
print_prune_results(prune_results, selected, args.metric)
centers = selected["centers"]
sigmas = selected["sigmas"]
model = selected["model"]
y_pred_input = predict(model, x, centers, sigmas, args.bg_order, xmin, xmax)
mae_input, rmse_input, _ = calc_metrics(y, y_pred_input)
print()
print("===== Adaptive Gaussian Weighted Ridge Regression =====")
print(f"source = {source}")
print(f"outfile = {args.outfile}")
print(f"mode = {args.mode}")
print(f"bg_order = {args.bg_order}")
print(f"n_input_data = {len(x)}")
print(f"n_centers_initial = {len(centers0)}")
print(f"n_gauss_selected = {len(centers)}")
print(f"lambda_bg = {args.lambda_bg}")
print(f"lambda_gauss = {args.lambda_gauss}")
print(f"sigma_penalty_power = {args.sigma_penalty_power}")
print(f"sigma_method = {args.sigma_method}")
print(f"sigma_alpha = {args.sigma_alpha}")
print(f"sigma_alpha_slope = {args.sigma_alpha_slope}")
print(f"sigma_min = {args.sigma_min}")
print(f"sigma_max = {args.sigma_max}")
print(f"MAE input = {mae_input:.6g}")
print(f"RMSE input = {rmse_input:.6g}")
tables = build_output_tables(
args,
source,
x,
y,
df_input,
centers0,
sigmas0,
density,
y_smooth,
dy,
d2y,
centers,
sigmas,
model,
xmin,
xmax,
prune_results,
)
write_excel_output(args.outfile, tables)
print(f"Saved Excel output: {args.outfile}")
plot_results(
args,
x,
y,
centers0,
sigmas0,
density,
y_smooth,
centers,
sigmas,
model,
xmin,
xmax,
prune_results,
)
if __name__ == "__main__":
try:
main()
except Exception:
print("ERROR: program terminated unexpectedly.")
traceback.print_exc()
input("\nPress ENTER to terminate>>\n")
sys.exit(1)