"""
光電子分光スペクトルのフィッティング、シミュレーション、初期値推定を行うスクリプト。
測定された光電子分光スペクトルに対して、フェルミ・ディラック分布関数と状態密度、
そしてガウス関数で表現される装置関数を組み合わせたモデルを適用し、
非線形最小二乗法によるフィッティング、あるいはパラメータ指定によるシミュレーションを実行します。
また、フィッティングの初期値をスペクトルデータから自動推定する機能も提供します。
:doc:`ef_fit_usage`
"""
import chardet
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from numpy import sqrt, log, exp
import argparse
import os
import traceback
from typing import Dict, Any, List, Tuple, Union
import re
plt.rcParams["font.family"] = "MS Gothic"
# --- デフォルト値 ---
EF0_def = 0.0
BG0_def = 0.0
w0_def = 0.20
I00_def = 200000.0
DOSa0_def = 0.0
# フィッティングパラメータの範囲、初期値、最適化ID、変換タイプ、EPSを定義
fit_params = {
"EF": { "optid": 1, "convert": "", "eps": 1.0e-5, "x0": EF0_def, "min": None, "max": None, "kpenalty": 1e8 },
"BG": { "optid": 1, "convert": "", "eps": 1.0e-3, "x0": BG0_def, "min": None, "max": None, "kpenalty": 1e8 },
"w": { "optid": 1, "convert": "log", "eps": 1.0e-5, "x0": w0_def, "min": 0.01, "max": 0.5, "kpenalty": 1e8 },
"I0": { "optid": 1, "convert": "log", "eps": 1.0e+0, "x0": I00_def, "min": 0.0, "max": None, "kpenalty": 1e8 },
"DOSa": { "optid": 0, "convert": "", "eps": 1.0e-5, "x0": DOSa0_def, "min": None, "max": None, "kpenalty": 1e8 },
}
PRINT_INTERVAL_def = 5
NMAXITER_def = 1000
# パラメータ名のリストをグローバルに定義
ALL_PARAM_NAMES = list(fit_params.keys())
OPT_PARAM_NAMES = [name for name, info in fit_params.items() if info["optid"] == 1]
FIXED_PARAM_NAMES = [name for name, info in fit_params.items() if info["optid"] == 0]
[ドキュメント]
def terminate():
"""
プログラムを終了する前にユーザー入力を待機する。
詳細説明:
ユーザーがEnterキーを押すまで処理をブロックし、その後プログラムを終了します。
:returns: None
"""
input("\nPress ENTER to terminate>>\n")
exit()
# -------------------------------
# 変換関数
# -------------------------------
[ドキュメント]
def convert_params(params_dict: Dict[str, float]) -> np.ndarray:
"""
全パラメータ辞書から、最適化対象のみを抽出し、logスケールに変換して配列として返す。
詳細説明:
`fit_params` に定義された `optid` が1のパラメータのみを抽出し、
`convert` 設定が "log" の場合は自然対数に変換します。
:param params_dict: パラメータ名とその値を含む辞書。
:type params_dict: Dict[str, float]
:returns: 対数変換された最適化対象パラメータのNumPy配列。
:rtype: numpy.ndarray
"""
opt_x0 = []
for name in OPT_PARAM_NAMES:
value = params_dict[name]
info = fit_params[name]
if info["convert"] == "log":
if value <= 0:
raise ValueError(f"Parameter '{name}' must be positive for log transformation.")
opt_x0.append(log(value))
else:
opt_x0.append(value)
return np.array(opt_x0)
[ドキュメント]
def recover_params(opt_params_array: np.ndarray, all_initial_params_dict: Dict[str, float]) -> List[float]:
"""
最適化パラメータ配列と、固定パラメータの初期値から、全パラメータの物理量リストを再構成。
詳細説明:
最適化されたパラメータ配列 `opt_params_array` を `fit_params` の `convert` 設定に基づいて
元の物理量スケール(指数関数など)に復元します。
固定パラメータは `all_initial_params_dict` から取得し、全てのパラメータを結合して
`ALL_PARAM_NAMES` で定義された順序のリストとして返します。
:param opt_params_array: 最適化されたパラメータのNumPy配列。
:type opt_params_array: numpy.ndarray
:param all_initial_params_dict: 固定パラメータを含む、全てのパラメータの初期値辞書。
:type all_initial_params_dict: Dict[str, float]
:returns: 全パラメータ ([EF, BG, w, I0, DOSa]の順) の物理量リスト。
:rtype: List[float]
"""
all_params_dict = all_initial_params_dict.copy()
opt_params_idx = 0
for name in OPT_PARAM_NAMES:
value_raw = opt_params_array[opt_params_idx]
info = fit_params[name]
if info["convert"] == "log":
value_recovered = exp(value_raw)
else:
value_recovered = value_raw
all_params_dict[name] = value_recovered
opt_params_idx += 1
return [all_params_dict[name] for name in ALL_PARAM_NAMES]
[ドキュメント]
def kG2fwhm(kG: Union[float, np.ndarray]) -> Union[float, np.ndarray]:
"""
ガウス関数のピーク位置の分散を示すパラメータkGからFWHM wに変換する。
詳細説明:
ガウス関数 `exp(-kG * E^2)` の半値全幅を計算します。
kGが0以下の場合は数値的に非常に大きなFWHMを返します。
NumPy配列に対応しています。
:param kG: ガウス関数の分散パラメータ。
:type kG: Union[float, numpy.ndarray]
:returns: ガウス関数の半値全幅。
:rtype: Union[float, numpy.ndarray]
"""
return np.where(kG <= 0, 1e10, 2.0 * sqrt(log(2.0) / kG))
[ドキュメント]
def fwhm2kG(w: Union[float, np.ndarray]) -> Union[float, np.ndarray]:
"""
ガウス関数のFWHM wからkGに変換する。
詳細説明:
半値全幅 `w` からガウス関数 `exp(-kG * E^2)` の `kG` を計算します。
wが0以下の場合は数値的に非常に大きなkGを返します。
NumPy配列に対応しています。
:param w: ガウス関数の半値全幅。
:type w: Union[float, numpy.ndarray]
:returns: ガウス関数の分散パラメータ。
:rtype: Union[float, numpy.ndarray]
"""
return np.where(w <= 0, 1e20, log(2.0) / (0.5 * w)**2)
# -------------------------------
# 引数初期化
# -------------------------------
[ドキュメント]
def initialize() -> argparse.Namespace:
"""
コマンドライン引数をパースし、設定をオブジェクトとして返す。
詳細説明:
`argparse` モジュールを使用して、入力ファイル名、モード(fit, sim, init)、
最適化アルゴリズム、温度、各パラメータの初期値、フィット範囲、
強度スケーリング係数、最大反復回数、ステップ出力間隔などを定義します。
:returns: コマンドライン引数を格納したNamespaceオブジェクト。
:rtype: argparse.Namespace
"""
parser = argparse.ArgumentParser(description="光電子分光スペクトル解析")
parser.add_argument("filename", help="入力ファイル(csv, xlsx, txt)")
parser.add_argument("--mode", choices=["fit", "sim", "init"], default="fit", help="モード: fit, sim, or init (初期値推定)")
parser.add_argument("--method", default="Nelder-Mead", help="最適化アルゴリズム (デフォルト Nelder-Mead")
parser.add_argument("--T", type=float, default=300.0, help="温度 [K]")
parser.add_argument("--EF", type=float, default=None, help=f"EF [eV] (デフォルト: {fit_params['EF']['x0']})")
parser.add_argument("--BG", type=float, default=None, help=f"BG (デフォルト: {fit_params['BG']['x0']})")
parser.add_argument("--w", type=float, default=None, help=f"装置関数fwhm (デフォルト: {fit_params['w']['x0']})")
parser.add_argument("--I0", type=float, default=None, help=f"シグナル強度I0 (デフォルト: {fit_params['I0']['x0']})")
parser.add_argument("--DOSa", type=float, default=None, help=f"DOSの線形傾き (デフォルト: {fit_params['DOSa']['x0']})")
parser.add_argument("--Efit_min", type=str, default=None, help="フィット範囲下限 [eV]")
parser.add_argument("--Efit_max", type=str, default=None, help="フィット範囲上限 [eV]")
parser.add_argument("--Escale", type=float, default=-1.0, help="強度スケーリング係数")
parser.add_argument("--nmaxiter", type=int, default=NMAXITER_def, help=f"最適化の最大反復回数 (デフォルト: {NMAXITER_def})")
parser.add_argument("--print_interval", type=int, default=PRINT_INTERVAL_def, help=f"ステップ出力間隔 (デフォルト: {PRINT_INTERVAL_def})")
return parser.parse_args()
# -------------------------------
# CSVパラメータファイルの読み書き
# -------------------------------
[ドキュメント]
def get_prm_csv_path(filename: str) -> str:
"""
元のデータファイル名から、対応するパラメータCSVファイルのパスを生成する。
詳細説明:
入力ファイル名の拡張子を除き、そこに "_parameters.csv" を付加して
パラメータ保存用のCSVファイルパスを作成します。
:param filename: データファイルのパス。
:type filename: str
:returns: パラメータCSVファイルのパス。
:rtype: str
"""
base, ext = os.path.splitext(filename)
return base + "_parameters.csv"
[ドキュメント]
def load_params(prm_csv_path: str) -> pd.DataFrame:
"""
指定されたパスからパラメータ設定をCSVファイルとして読み込むか、存在しない場合はデフォルト値で初期化する。
詳細説明:
パラメータCSVファイルが存在し、読み込みに成功した場合はその内容をDataFrameとして返します。
読み込み失敗またはファイルが存在しない場合は、スクリプト内で定義されている `fit_params` の
デフォルト値 (`x0`) を使用してDataFrameを生成し、返します。
この際、`optid`, `convert`, `eps`, `min`, `max`, `kpenalty` の各列も `fit_params` から補完されます。
:param prm_csv_path: パラメータCSVファイルのパス。
:type prm_csv_path: str
:returns: パラメータ設定を格納したDataFrame。
:rtype: pandas.DataFrame
"""
if os.path.exists(prm_csv_path):
try:
df = pd.read_csv(prm_csv_path, index_col='Parameter')
for name, info in fit_params.items():
if name in df.index:
for col in ['optid', 'convert', 'eps', 'min', 'max', 'kpenalty']:
# 既存のCSV列にない場合はfit_paramsからコピー、Noneの場合はNaN
df.loc[name, col] = info[col] if info[col] is not None else np.nan
print(f"Status: Parameters loaded from '{os.path.basename(prm_csv_path)}'.")
return df
except Exception as e:
print(f"Warning: Failed to read CSV: {e}. Using code defaults.")
pass
# ファイルが存在しない、または読み込み失敗した場合、デフォルト値でDataFrameを作成
data = []
for name, info in fit_params.items():
row = {
'Parameter': name,
'optid': info['optid'],
'convert': info['convert'],
'eps': info['eps'],
'min': info['min'] if info['min'] is not None else np.nan,
'max': info['max'] if info['max'] is not None else np.nan,
'kpenalty': info['kpenalty'],
'Value': info['x0'],
}
data.append(row)
df = pd.DataFrame(data).set_index('Parameter')
print(f"Status: Initialized default parameters.")
return df
[ドキュメント]
def save_params(prm_csv_path: str, df_params: pd.DataFrame) -> None:
"""
DataFrame形式のパラメータ設定をCSVファイルに保存する。
:param prm_csv_path: 保存先のパラメータCSVファイルのパス。
:type prm_csv_path: str
:param df_params: 保存するパラメータ設定を含むDataFrame。
:type df_params: pandas.DataFrame
:returns: None
"""
try:
df_params.to_csv(prm_csv_path, index=True, float_format='%.6e')
print(f"Status: Parameters saved to '{os.path.basename(prm_csv_path)}'.")
except Exception as e:
print(f"Error: Failed to save parameters to CSV: {e}")
traceback.print_exc()
[ドキュメント]
def update_params(args: argparse.Namespace, df_params: pd.DataFrame) -> Dict[str, float]:
"""
コマンドライン引数、CSVファイルから全パラメータの初期値を決定し、辞書で返す。
詳細説明:
まず `df_params` から 'Value' 列の値を読み込み、その後コマンドライン引数 (`args`) で
指定された値があればそれらを優先して上書きします。
`w` と `I0` が正の値であることを確認し、`w` から `kG` を計算して辞書に追加します。
:param args: コマンドライン引数。
:type args: argparse.Namespace
:param df_params: パラメータCSVから読み込まれたDataFrame。
:type df_params: pandas.DataFrame
:returns: 更新された全パラメータの辞書。
:rtype: Dict[str, float]
:raises ValueError: `w` (FWHM) または `I0` (シグナル強度) が正でない場合。
"""
initial_params_dict = {}
for name, info in fit_params.items():
# 1. CSVから 'Value' を取得
val = df_params.loc[name, 'Value']
# 2. コマンドライン引数で上書き
arg_val = getattr(args, name, None)
if arg_val is not None:
val = arg_val
initial_params_dict[name] = float(val)
if initial_params_dict["w"] <= 0 or initial_params_dict["I0"] <= 0:
raise ValueError("w (FWHM) and I0 must be positive.")
w0 = initial_params_dict["w"]
initial_params_dict["kG"] = fwhm2kG(w0)
return initial_params_dict
[ドキュメント]
def print_params(message: str, params_dict: Dict[str, float]) -> None:
"""
指定されたメッセージと共に、全パラメータの現在値をコンソールに出力する。
詳細説明:
`ALL_PARAM_NAMES` に従って各パラメータとその値を整形して表示します。
`w` が存在する場合は、対応する `kG` の値も併せて表示します。
:param message: 出力の前に表示するメッセージ。
:type message: str
:param params_dict: 出力するパラメータ名と値の辞書。
:type params_dict: Dict[str, float]
:returns: None
"""
print(message)
w = params_dict.get('w', None)
kG = fwhm2kG(w) if w is not None and w > 0 else 0.0
for name in ALL_PARAM_NAMES:
val = params_dict.get(name)
print(f" {name}={val:.6f}")
if w is not None:
print(f" w ={w:.6f} (kG={kG:.6f})")
# -------------------------------
# スペクトルデータ読み込み
# -------------------------------
[ドキュメント]
def read_haxpes_spring8(file_path: str) -> Tuple[np.ndarray, np.ndarray]:
"""
Spring-8 HAXPES装置の特定のフォーマット(`[Info]` ヘッダーを持つ)のデータファイルを読み込む。
詳細説明:
ファイルエンコーディングを自動検出します。
ヘッダー情報からデータの次元(行数、列数)を解析し、`[Data 1]` ブロックから
エネルギーと平均信号強度を抽出します。複数列の強度データは平均化されます。
:param file_path: 読み込むファイルのパス。
:type file_path: str
:returns: エネルギーと平均信号強度のNumPy配列のタプル。
:rtype: Tuple[numpy.ndarray, numpy.ndarray]
"""
try:
with open(file_path, 'rb') as f:
raw_data = f.read()
except FileNotFoundError:
print(f"Error: File not found: {file_path}")
return np.array([]), np.array([])
except Exception as e:
print(f"Error reading/decoding file: {e}")
return np.array([]), np.array([])
detected = chardet.detect(raw_data)
encoding = detected['encoding']
confidence = detected['confidence']
# print(f"Detected encoding: {encoding} (Confidence: {confidence:.2f})")
if encoding is None:
print("Warning: Failed to detect character encoding. Use python default")
# encoding = 'utf-8'
lines = []
try:
content = raw_data.decode(encoding)
lines = content.splitlines()
except Exception as e:
print(f"Error decoding data: {e}")
return np.array([]), np.array([])
# ---------------------------------------------------------
# 2. Typeチェック
# ---------------------------------------------------------
if not lines or '[Info]' not in lines[0]:
print("Error: Invalid file format. Header '[Info]' missing in the first line.")
return np.array([]), np.array([])
# ---------------------------------------------------------
# 3. ヘッダー情報の解析
# ---------------------------------------------------------
dim1_size = 0 # データ行数
dim2_size = 0 # データ列数
data_start_idx = -1
for i, line in enumerate(lines):
line = line.strip()
if line.startswith('Dimension 1 size='):
dim1_size = int(line.split('=')[1])
elif line.startswith('Dimension 2 size='):
dim2_size = int(line.split('=')[1])
elif line == '[Data 1]':
data_start_idx = i + 1
break
if dim1_size == 0:
print("Error: Failed to parse dimension1 size.")
return np.array([]), np.array([])
if dim2_size == 0:
dim2_size = 1
if data_start_idx == -1:
print("Error: Failed to find [Data 1] tag.")
return np.array([]), np.array([])
# ---------------------------------------------------------
# 4. データ領域の処理
# ---------------------------------------------------------
E_list = []
I_list = []
# 必要な行数だけ切り出す
data_block = lines[data_start_idx : data_start_idx + dim1_size]
for row_idx, line in enumerate(data_block):
parts = line.split()
# 列数チェック
expected_cols = 1 + dim2_size
if len(parts) < expected_cols:
print(f"Warning: Line {data_start_idx + row_idx + 1} insufficient columns.")
continue
try:
# 1列目: エネルギー
energy = float(parts[0])
# 2列目以降: 信号強度
intensities = [float(x) for x in parts[1 : expected_cols]]
# 平均化
avg_intensity = sum(intensities) / len(intensities)
E_list.append(energy)
I_list.append(avg_intensity)
except ValueError:
print(f"Warning: Conversion error at line {data_start_idx + row_idx + 1}.")
continue
return np.array(E_list, dtype='float64'), np.array(I_list, dtype='float64')
[ドキュメント]
def read_hapes_xy(file_path: str, read_ndata: bool = True) -> Tuple[np.ndarray, np.ndarray]:
"""
ヘッダー情報が少ない(またはない)XY形式のデータファイルを読み込む。
詳細説明:
ファイルエンコーディングを自動検出します。
`read_ndata` がTrueの場合、ファイルの3行目からデータ読み込みを開始し、
Falseの場合は2行目からデータ読み込みを開始します。
各行はスペース区切りのエネルギーと信号強度の2列データとして解析されます。
:param file_path: 読み込むファイルのパス。
:type file_path: str
:param read_ndata: データが始まる前にデータ行数情報があるかを示すフラグ。
Trueの場合、2行をスキップして3行目から読み込みます。
Falseの場合、1行をスキップして2行目から読み込みます。
:type read_ndata: bool
:returns: エネルギーと信号強度のNumPy配列のタプル。
:rtype: Tuple[numpy.ndarray, numpy.ndarray]
"""
try:
with open(file_path, 'rb') as f:
raw_data = f.read()
except FileNotFoundError:
print(f"Error: File not found: {file_path}")
return np.array([]), np.array([])
except Exception as e:
print(f"Error reading/decoding file: {e}")
return np.array([]), np.array([])
detected = chardet.detect(raw_data)
encoding = detected['encoding']
confidence = detected['confidence']
# print(f"Detected encoding: {encoding} (Confidence: {confidence:.2f})")
if encoding is None:
print("Warning: Failed to detect character encoding. Use python default")
# encoding = 'utf-8'
lines = []
try:
content = raw_data.decode(encoding)
lines = content.splitlines()
except Exception as e:
print(f"Error decoding data: {e}")
return np.array([]), np.array([])
if read_ndata:
idx = 2
else:
idx = 1
E_list = []
I_list = []
while True:
if idx >= len(lines) or lines[idx] is None or lines[idx].strip() == "": break
_aa = lines[idx].split()
if len(_aa) < 2: break
# print("_aa=", _aa) # デバッグ用コメントアウト
E_list.append(float(_aa[0]))
I_list.append(float(_aa[1]))
idx += 1
return np.array(E_list, dtype='float64'), np.array(I_list, dtype='float64')
[ドキュメント]
def read_haxpes_txt(file_path: str) -> Tuple[np.ndarray, np.ndarray]:
"""
`.txt` 拡張子のデータファイルを読み込み、その内容に応じて適切なパーサーを呼び出す。
詳細説明:
ファイルの最初の数行を読み込み、以下の条件に基づいてパーサーを決定します。
- 1行目に `[Info]` が含まれる場合: `read_haxpes_spring8` を呼び出す。
- 2行目が数字のみの場合: `read_hapes_xy` (引数 `read_ndata=True`) を呼び出す。
- それ以外の場合: `read_hapes_xy` (引数 `read_ndata=False`) を呼び出す。
:param file_path: 読み込むファイルのパス。
:type file_path: str
:returns: エネルギーと信号強度のNumPy配列のタプル。
:rtype: Tuple[numpy.ndarray, numpy.ndarray]
"""
try:
with open(file_path, 'r') as f:
line1 = f.readline()
line2 = f.readline()
line3 = f.readline()
except FileNotFoundError:
print(f"Error: File not found: {file_path}")
return np.array([]), np.array([])
except Exception as e:
print(f"Error reading/decoding file: {e}")
return np.array([]), np.array([])
if '[Info]' in line1:
return read_haxpes_spring8(file_path)
if re.match(r'\s*\d+\s*$', line2):
return read_hapes_xy(file_path, read_ndata = True)
return read_hapes_xy(file_path, read_ndata = False)
[ドキュメント]
def load_spectrum(filename: str, Escale: float = -1.0) -> Tuple[np.ndarray, np.ndarray]:
"""
データファイルを読み込み、エネルギーEと強度IをNumPy配列として返す。
詳細説明:
ファイルの拡張子 (.csv, .xlsx, .txt, .dat) に応じて適切なライブラリや関数を使用してデータを読み込みます。
`.txt` ファイルの場合は、カスタムパーサー `read_haxpes_txt` を使用します。
エネルギーデータには `Escale` を乗算してスケーリング(デフォルトは-1.0で反転)します。
:param filename: 読み込むデータファイルのパス。
:type filename: str
:param Escale: エネルギー軸のスケーリング係数(デフォルトは-1.0で反転)。
:type Escale: float
:returns: エネルギーと強度のNumPy配列のタプル。
:rtype: Tuple[numpy.ndarray, numpy.ndarray]
:raises ValueError: 未対応のファイル形式が指定された場合。
"""
ext = os.path.splitext(filename)[1].lower()
if ext == ".txt":
E, I = read_haxpes_txt(filename)
else:
if ext == ".csv":
df = pd.read_csv(filename)
elif ext == ".xlsx":
df = pd.read_excel(filename)
elif ext in [".txt", ".dat"]: # .txtは上記で処理されるが、念のため残す
df = pd.read_csv(filename, skiprows=1)
else:
raise ValueError(f"未対応のファイル形式: {ext}")
# データフレームが空でないことを確認し、列名を標準化
if df.empty or df.shape[1] < 2:
raise ValueError(f"ファイル '{filename}' はデータが空か、期待される列数を含んでいません。")
# 列名が不明な場合の対処
# ここでは、最初の2列をEnergyとIntensityとして扱う
df.columns = ["Energy (eV)", "Measured Intensity"] + list(df.columns[2:])
E = df["Energy (eV)"].values.astype(float) * Escale
I = df["Measured Intensity"].values.astype(float)
return E, I
# -------------------------------
# モデルスペクトル(カーネル定義)
# -------------------------------
[ドキュメント]
def fermi(E: Union[float, np.ndarray], EF: float, T: float) -> Union[float, np.ndarray]:
"""
フェルミ・ディラック分布関数を計算する。
詳細説明:
温度 `T` とフェルミ準位 `EF` を用いて、エネルギー `E` における占有確率を計算します。
数値的な安定性のために指数関数の引数をクリップし、オーバーフローやアンダーフローを防ぎます。
:param E: エネルギー [eV]。NumPy配列も可能。
:type E: Union[float, numpy.ndarray]
:param EF: フェルミ準位 [eV]。
:type EF: float
:param T: 温度 [K]。
:type T: float
:returns: フェルミ・ディラック分布関数の値。NumPy配列も可能。
:rtype: Union[float, numpy.ndarray]
"""
kB = 8.617333e-5 # eV/K
ke = (E - EF) / (kB * T)
# 数値的に安全な範囲に制限
ke = np.clip(ke, -30, 30)
return 1.0 / (np.exp(ke) + 1.0)
[ドキュメント]
def DOS(E: Union[float, np.ndarray], DOSa: float) -> Union[float, np.ndarray]:
"""
状態密度 (DOS) を計算する。
詳細説明:
ここでは単純な線形傾きを仮定し、`(1.0 - DOSa * E)` で状態密度を表現します。
:param E: エネルギー [eV]。NumPy配列も可能。
:type E: Union[float, numpy.ndarray]
:param DOSa: 状態密度の線形傾きパラメータ。
:type DOSa: float
:returns: 状態密度の値。NumPy配列も可能。
:rtype: Union[float, numpy.ndarray]
"""
return (1.0 - DOSa * E)
[ドキュメント]
def app_func(E_kernel: Union[float, np.ndarray], kG: float) -> Union[float, np.ndarray]:
"""
ガウス関数カーネルを計算する (プロット用)。
詳細説明:
分散パラメータ `kG` を用いて、`exp(-kG * E_kernel^2)` のガウス関数を返します。
主にグラフ描画などの目的で使用されます。
:param E_kernel: カーネルのエネルギー範囲。NumPy配列も可能。
:type E_kernel: Union[float, numpy.ndarray]
:param kG: ガウス関数の分散パラメータ。
:type kG: float
:returns: ガウス関数カーネルの値。NumPy配列も可能。
:rtype: Union[float, numpy.ndarray]
"""
return np.exp(-kG * E_kernel**2)
[ドキュメント]
def kernel_func(dE: Union[float, np.ndarray], plist: List[float]) -> Union[float, np.ndarray]:
"""
スペクトル畳み込みに使用するガウス関数カーネルを計算する。
詳細説明:
`plist` の最初の要素である半値全幅 `w` から `kG` を計算し、
`exp(-kG * dE^2)` のガウス関数を返します。
:param dE: エネルギー差分。NumPy配列も可能。
:type dE: Union[float, numpy.ndarray]
:param plist: 半値全幅 `w` を含むパラメータリスト。`plist[0]` が `w` となります。
:type plist: List[float]
:returns: ガウス関数カーネルの値。NumPy配列も可能。
:rtype: Union[float, numpy.ndarray]
"""
w = plist[0]
kG = fwhm2kG(w)
ret = np.exp(-kG * dE**2)
# print("w=", w, ret[0:5])
return ret
[ドキュメント]
def convolute_by_func_original(E: np.ndarray, I_phys: np.ndarray, w: float) -> np.ndarray:
"""
物理的なスペクトルをガウス関数で畳み込む (オリジナル版、ループ処理)。
詳細説明:
各点について、指定された半値全幅 `w` を持つガウス関数カーネルを用いて、
物理的な強度 `I_phys` を畳み込みます。この実装はPythonのループを使用するため、
NumPy最適化版 (`convolute_by_func_np`) よりも効率が低い可能性があります。
`w` が非常に小さい場合 (1e-10以下) は畳み込みを行わず `I_phys` をそのまま返します。
:param E: エネルギー配列。
:type E: numpy.ndarray
:param I_phys: 物理的な信号強度配列。
:type I_phys: numpy.ndarray
:param w: ガウス関数の半値全幅。
:type w: float
:returns: 畳み込み後の信号強度配列。
:rtype: numpy.ndarray
"""
if w <= 1e-10: return I_phys
ndata = len(E)
dx = np.abs(E[1] - E[0])
di = int( (w * 5.0) / abs(dx) + 1.1 )
I_conv = np.zeros(ndata, dtype=float)
for j in range(ndata):
wtot = 0.0
for k in range(-di, di + 1):
idx_phys = j - k
if idx_phys < 0 or idx_phys >= ndata:
continue
dE = k * dx
G_kernel = kernel_func(dE, [w]) # gaussian_kernel_func から kernel_func に変更
weight = G_kernel * abs(dx)
wtot += weight
I_conv[j] += I_phys[idx_phys] * weight
if wtot > 1e-10:
I_conv[j] /= wtot
else:
I_conv[j] = 0.0
return I_conv
# ==========================================
# 3. NumPy Optimized (修正版)
# ==========================================
[ドキュメント]
def convolute_by_func_np(E: np.ndarray, I_phys: np.ndarray, w: float, kernel_range: float = 5.0) -> np.ndarray:
"""
物理的なスペクトルをガウス関数で畳み込む (NumPy最適化版)。
詳細説明:
指定された半値全幅 `w` を持つガウス関数カーネルを生成し、`numpy.convolve` を使用して
物理的な強度 `I_phys` を効率的に畳み込みます。
カーネルの有効範囲は `kernel_range * w` で設定され、結果はカーネルの合計値で正規化されます。
`w` が非常に小さい場合 (1e-10以下) は畳み込みを行わず `I_phys` をそのまま返します。
:param E: エネルギー配列。
:type E: numpy.ndarray
:param I_phys: 物理的な信号強度配列。
:type I_phys: numpy.ndarray
:param w: ガウス関数の半値全幅。
:type w: float
:param kernel_range: カーネルの有効範囲を `w` の倍数で指定する係数。デフォルトは5.0。
:type kernel_range: float
:returns: 畳み込み後の信号強度配列。
:rtype: numpy.ndarray
"""
if w <= 1e-10: return I_phys
ndata = len(E)
dx = np.abs(E[1] - E[0])
di = int( (w * kernel_range) / dx + 1.1 )
if di < 1: di = 1
k_indices = np.arange(-di, di + 1)
dE_array = k_indices * dx
weights = kernel_func(dE_array, [w]) * dx
I_conv_unnorm = np.convolve(I_phys, weights, mode='same')
W_norm = np.convolve(np.ones(ndata), weights, mode='same')
# 配列サイズ補正 (convolveのmode='same'は入力と同じサイズなので不要な場合が多いが、念のため残す)
if I_conv_unnorm.shape[0] > ndata:
start = (I_conv_unnorm.shape[0] - ndata) // 2
I_conv_unnorm = I_conv_unnorm[start : start + ndata]
W_norm = W_norm[start : start + ndata]
I_conv = np.divide(I_conv_unnorm, W_norm,
out=np.zeros_like(I_conv_unnorm),
where=W_norm > 1e-10)
return I_conv
[ドキュメント]
def model_spectrum(E: np.ndarray, EF: float, BG: float, kG: float, I0: float, DOSa: float, T: float) -> np.ndarray:
"""
光電子分光スペクトルのモデルを計算する。
詳細説明:
状態密度 `DOS` とフェルミ・ディラック分布関数 `fermi` を乗算して物理的なスペクトル `I_phys` を生成し、
これを装置関数(ガウス関数)で畳み込みます。
最終的に、基底 `BG` と信号強度スケール `I0` を適用してモデルスペクトルを完成させます。
:param E: エネルギー配列。
:type E: numpy.ndarray
:param EF: フェルミ準位 [eV]。
:type EF: float
:param BG: バックグラウンド強度。
:type BG: float
:param kG: ガウス関数の分散パラメータ。
:type kG: float
:param I0: シグナル強度スケールファクター。
:type I0: float
:param DOSa: 状態密度の線形傾きパラメータ。
:type DOSa: float
:param T: 温度 [K]。
:type T: float
:returns: 計算されたモデルスペクトル強度配列。
:rtype: numpy.ndarray
"""
w = kG2fwhm(kG)
D = DOS(E, DOSa)
f = fermi(E, EF, T)
I_phys = D * f
conv = convolute_by_func(E, I_phys, w)
return I0 * conv + BG
[ドキュメント]
def calS(opt_params: np.ndarray, E: np.ndarray, I: np.ndarray, T: float, initial_params_dict: Dict[str, float], termination_state: List[Union[bool, float]]) -> float:
"""
目的関数 (残差二乗和 S + ペナルティ項)。
詳細説明:
最適化対象のパラメータ `opt_params` を物理量に復元し、
それらを用いて `model_spectrum` を計算します。
測定データ `I` とモデルの差の二乗和 `S` を計算し、
パラメータが物理的な範囲外に逸脱した場合にペナルティを加算します。
`termination_state` フラグがTrueの場合、計算をスキップし、キャッシュされたエラー値を返します。
NaNやInfのような異常値が発生した場合も大きなエラー値を返します。
:param opt_params: 最適化対象のパラメータ配列(対数変換されている場合あり)。
:type opt_params: numpy.ndarray
:param E: フィットに使用するエネルギー配列。
:type E: numpy.ndarray
:param I: フィットに使用する測定強度配列。
:type I: numpy.ndarray
:param T: 温度 [K]。
:type T: float
:param initial_params_dict: 固定パラメータを含む、全てのパラメータの初期値辞書。
:type initial_params_dict: Dict[str, float]
:param termination_state: 早期停止状態を表すフラグ (bool) と、その際のエラー値 (float) を格納するリスト。
`[is_terminated, last_error_value]` の形式。
:type termination_state: List[Union[bool, float]]
:returns: 残差二乗和とペナルティ項の合計値。
:rtype: float
"""
# 早期停止が確定している場合
if termination_state[0]:
return termination_state[1]
# 1. 全パラメータを元のスケールに復元
EF, BG, w, I0, DOSa = recover_params(opt_params, initial_params_dict)
# -------------------------------------------------------------
# ガード処理
# -------------------------------------------------------------
# [Check 1] パラメータ自体が NaN や Inf になっていないか確認
if not np.all(np.isfinite([EF, BG, w, I0, DOSa])):
return 1e30
# [Check 2] w (FWHM) の物理的妥当性と kG のオーバーフロー防止
# w が 0 に近すぎると、kG = log(2)/(w/2)^2 が無限大になり、exp(-kG*x^2) の計算で不具合が出る
# また、I0 が負である場合も物理的にありえないため除外
min_w_limit = 1e-5 # これより細い幅は計測限界以下として弾く
if w < min_w_limit or I0 <= 1e-10:
return 1e30
# [Check 3] kG の算出
kG = fwhm2kG(w)
# 万が一 kG が巨大になりすぎた場合のガード(double精度の限界考慮)
if kG > 1e20:
return 1e30
# -------------------------------------------------------------
# 2. モデル計算 (エラーチェック済みなので直接実行)
# -------------------------------------------------------------
I_model = model_spectrum(E, EF, BG, kG, I0, DOSa, T)
# [Check 4] 計算結果に NaN/Inf が含まれていないか最終確認
# (convolveの結果などで万が一発生した場合用)
if not np.all(np.isfinite(I_model)):
return 1e30
# 残差二乗和の計算
S = np.sum((I - I_model) ** 2)
# -------------------------------------------------------------
# 3. ペナルティ項の計算
# -------------------------------------------------------------
penalty = 0.0
p_names = ["EF", "BG", "w", "I0", "DOSa"]
p_values = [EF, BG, w, I0, DOSa]
for name, value in zip(p_names, p_values):
p_info = fit_params.get(name)
if not p_info: continue
kpenalty = p_info["kpenalty"]
p_min = p_info["min"]
p_max = p_info["max"]
# w, I0 の正値制約ペナルティ (念のため残すが、上流で弾いているので基本到達しない)
if (name == 'w' and value <= 0) or (name == 'I0' and value <= 0):
penalty += kpenalty * 1e6
continue
if p_min is not None and value < p_min:
penalty += kpenalty * (p_min - value)**2
if p_max is not None and value > p_max:
penalty += kpenalty * (value - p_max)**2
return S + penalty
# -------------------------------
# 初期値推定モード
# -------------------------------
[ドキュメント]
def init_params(args: argparse.Namespace, df_params: pd.DataFrame) -> None:
"""
スペクトルデータから主要なフィッティングパラメータの初期値を推定する。
詳細説明:
スペクトルデータを読み込み、指定されたフィット範囲内で以下のパラメータを推定します。
- BG (バックグラウンド): フィット範囲の最大エネルギー点での強度。
- I0 (信号強度): フィット範囲の最小エネルギー点での強度からBGを引いた値。
- EF (フェルミ準位): (BG + I0/2) の強度を与えるエネルギー点。
- w (装置関数FWHM): デフォルト値と温度による熱的幅から推定。
- DOSa (状態密度の線形傾き): デフォルト値。
推定結果はコンソールに表示され、パラメータCSVファイルに保存されます。
また、推定結果のスペクトルと元の測定スペクトルがプロットされます。
:param args: コマンドライン引数。
:type args: argparse.Namespace
:param df_params: パラメータCSVから読み込まれたDataFrame。
:type df_params: pandas.DataFrame
:returns: None
:raises ValueError: `init` モードで `Efit_min` または `Efit_max` が設定されていない場合、
またはフィット範囲が小さすぎるか無効な場合。
"""
print("\n--- Guess initial parameters ---")
print(f"Efit_min={args.Efit_min}, Efit_max={args.Efit_max}, T={args.T}K")
E, I = load_spectrum(args.filename, args.Escale)
if type(args.Efit_min) is str:
try:
args.Efit_min = float(args.Efit_min)
except:
args.Efit_min = min(E)
if type(args.Efit_max) is str:
try:
args.Efit_max = float(args.Efit_max)
except:
args.Efit_max = max(E)
print()
print("Fitting:")
print(f"Escale : {args.Escale}")
print(f"Efit_min: {args.Efit_min}")
print(f"Efit_max: {args.Efit_max}")
if args.Efit_min is None or args.Efit_max is None:
raise ValueError("mode='init' requires both --Efit_min and --Efit_max to be set.")
mask = (E >= args.Efit_min) & (E <= args.Efit_max)
E_fit, I_fit = E[mask], I[mask]
if len(E_fit) < 2:
raise ValueError("Fit range is too small or invalid.")
idx_max = np.argmin(np.abs(E - args.Efit_max))
BG_init = I[idx_max]
idx_min = np.argmin(np.abs(E - args.Efit_min))
I_min = I[idx_min]
I0_init = I_min - BG_init
if I0_init <= 0:
I0_init = fit_params['I0']['x0']
target_intensity = BG_init + (I0_init / 2.0)
intensity_diff = np.abs(I_fit - target_intensity)
idx_ef = np.argmin(intensity_diff)
EF_init = E_fit[idx_ef]
kB = 8.617333e-5
w_thermal = 3.5 * kB * args.T
w_default = fit_params['w']['x0']
w_init = max(w_default, w_thermal * 1.5)
if fit_params['w']['max'] is not None and w_init > fit_params['w']['max']:
w_init = fit_params['w']['max']
DOSa_init = fit_params['DOSa']['x0']
initial_params_dict = {
"EF": EF_init, "BG": BG_init, "w": w_init, "I0": I0_init, "DOSa": DOSa_init
}
print_params("\nGuessed parameters:", initial_params_dict)
# 推定値をdf_paramsのValue列に書き込み
for name, value in initial_params_dict.items():
if name in df_params.index:
df_params.loc[name, 'Value'] = value
prm_csv_path = get_prm_csv_path(args.filename)
save_params(prm_csv_path, df_params)
print(f"\nGuessed parameters saved to [{os.path.basename(prm_csv_path)}]")
# プロット
kG_init = fwhm2kG(w_init)
# DOSa_init は initial_params_dict に含まれるが、直接使用
I_model = model_spectrum(E, EF_init, BG_init, kG_init, I0_init, DOSa_init, args.T)
plt.figure()
plt.plot(E, I, 'o', label="measured")
plt.plot(E, I_model, '-', label="guessed")
if args.Efit_min: plt.axvline(args.Efit_min, color='gray', linestyle='--', linewidth = 0.5)
if args.Efit_max: plt.axvline(args.Efit_max, color='gray', linestyle='--', linewidth = 0.5)
plt.axvline(EF_init, color='red', linestyle=':', label="EF (guessed)", linewidth = 0.5)
plt.axhline(BG_init + I0_init, color='green', linestyle=':', label="Max Intensity (guessed)", linewidth = 0.5)
plt.axhline(BG_init, color='blue', linestyle=':', label="BG (guessed)", linewidth = 0.5)
plt.xlabel("Energy (eV)")
plt.ylabel("Intensity")
plt.legend()
plt.title("Guess result")
plt.tight_layout()
plt.pause(0.1)
# -------------------------------
# フィッティング処理
# -------------------------------
[ドキュメント]
def fit(args: argparse.Namespace, prm_csv_path: str, df_params: pd.DataFrame) -> None:
"""
光電子分光スペクトルのフィッティングを実行する。
詳細説明:
測定スペクトルを読み込み、指定されたフィット範囲でデータを抽出します。
初期パラメータはコマンドライン引数とCSVファイルから設定されます。
`scipy.optimize.minimize` を使用して `calS` (目的関数) を最小化し、最適化を行います。
最適化の進行状況は、ステップごとのコンソール出力とリアルタイムのグラフ更新で表示されます。
フィッティングが完了すると、最適化されたパラメータがパラメータCSVファイルに保存され、
元のデータ、初期モデル、最終モデルを含むExcelファイルが出力されます。
:param args: コマンドライン引数。
:type args: argparse.Namespace
:param prm_csv_path: パラメータCSVファイルのパス。
:type prm_csv_path: str
:param df_params: パラメータCSVから読み込まれたDataFrame。
:type df_params: pandas.DataFrame
:returns: None
"""
E, I = load_spectrum(args.filename, args.Escale)
if type(args.Efit_min) is str:
try:
args.Efit_min = float(args.Efit_min)
except:
args.Efit_min = min(E)
if type(args.Efit_max) is str:
try:
args.Efit_max = float(args.Efit_max)
except:
args.Efit_max = max(E)
print()
print("Fitting:")
print(f"Escale : {args.Escale}")
print(f"Efit_min: {args.Efit_min}")
print(f"Efit_max: {args.Efit_max}")
print(f"method : {args.method}")
print(f"nmaxiter: {args.nmaxiter}")
# フィット範囲
mask = np.ones_like(E, dtype=bool)
if args.Efit_min is not None:
mask &= E >= args.Efit_min
if args.Efit_max is not None:
mask &= E <= args.Efit_max
E_fit, I_fit = E[mask], I[mask]
# 全パラメータの初期値設定
initial_params_dict = update_params(args, df_params)
# 最適化対象のみを抽出し、logスケールに変換
log_params0 = convert_params(initial_params_dict)
print_params("\nInitial parameters:", initial_params_dict)
history = []
step_count = 0
# --- 外部停止フラグ [is_terminated (bool), last_error (float)] ---
# minimizeの引数として渡すためリストを使用
termination_state = [False, 0.0]
# -------------------------------------------------------------
# ---------------------------------------------
# アニメーション初期化
# ---------------------------------------------
fig, ax = plt.subplots()
ax.plot(E, I, 'o', label="measured", markersize=2)
kG0 = initial_params_dict['kG']
DOSa0 = initial_params_dict['DOSa']
EF0 = initial_params_dict['EF']
BG0 = initial_params_dict['BG']
I00 = initial_params_dict['I0']
I_model_init = model_spectrum(E, EF0, BG0, kG0, I00, DOSa0, args.T)
# 初期スペクトルをプロット
ax.plot(E, I_model_init, '--', color='gray', label="initial")
# フィッティングカーブ用のラインを初期化
line_fit, = ax.plot(E, I_model_init, '-', color='red', label="optimizing...")
if args.Efit_min: ax.axvline(args.Efit_min, color='gray', linestyle='--', linewidth = 0.5)
if args.Efit_max: ax.axvline(args.Efit_max, color='gray', linestyle='--', linewidth = 0.5)
ax.set_xlabel("Energy (eV)")
ax.set_ylabel("Intensity")
ax.legend()
plt.ion()
plt.pause(0.1)
# ---------------------------------------------
prev_opt_params = np.copy(log_params0)
def callback(opt_params: np.ndarray) -> bool:
"""
最適化ステップごとのコールバック関数 (eps停止条件をチェック)。
詳細説明:
この関数は `scipy.optimize.minimize` の各イテレーション後に呼び出されます。
- 最適化対象パラメータを物理量に復元し、目的関数 `calS` を計算します。
- 現在のパラメータ値と誤差 `err` をコンソールに `print_interval` ごとに出力します。
- `eps` を用いた早期停止条件をチェックします。
- フィッティングカーブをリアルタイムで更新し、グラフに表示します。
:param opt_params: 現在の最適化パラメータのNumPy配列。
:type opt_params: numpy.ndarray
:returns: 最適化を継続する場合は `False`、停止する場合は `True`。
:rtype: bool
"""
nonlocal step_count, prev_opt_params, termination_state
# 停止が確定していたら、計算負荷の高い残りの処理をスキップしTrueを返す
if termination_state[0]:
return True
step_count += 1
# 1. 全パラメータを元のスケールに復元
EF, BG, w, I0, DOSa = recover_params(opt_params, initial_params_dict)
# 履歴に保存するデータを作成 (物理量のリスト)
# calSの計算はここで行う (calS内でtermination_stateのチェックは行わない)
err = calS(opt_params, E_fit, I_fit, args.T, initial_params_dict, [False, 0.0]) # フラグは無視
all_param_values = [EF, BG, w, I0, DOSa]
history.append([step_count, *all_param_values, err])
# ---------------------------------------------
# 2. 停止条件のチェック (eps判定)
# ---------------------------------------------
if step_stop: # early_stop の代わりに early_stop_flag を使用
diff_satisfied = True
current_param_values = np.array(all_param_values)
prev_all_params_list = recover_params(prev_opt_params, initial_params_dict)
prev_param_values = np.array(prev_all_params_list)
param_names = ALL_PARAM_NAMES
for i, name in enumerate(param_names):
if fit_params[name]['optid'] == 0:
continue
eps = fit_params[name]['eps']
delta = np.abs(current_param_values[i] - prev_param_values[i])
# パラメータがeps以上変化した場合、停止条件を満たさない
if delta >= eps:
diff_satisfied = False
break
# early_stop_flag が True の場合のみ diff_satisfied の判定が有効
# それ以外は diff_satisfied を False にして継続させる
if not early_stop_flag: diff_satisfied = False
# 早期停止条件を満たした場合、フラグを立てて最後のエラー値をキャッシュ
if diff_satisfied:
termination_state[0] = True
termination_state[1] = err
print(f"[Step {step_count:4d}] EF={EF:.5f}, BG={BG:.5f}, w={w:.5f}, I0={I0:.5f}, DOSa={DOSa:.5f}, Error={err:.6e}")
print(f"\nOptimization terminated successfully: All fitted parameters changed by less than their respective EPS value (Step {step_count}).")
return True # 即座に停止シグナルを返す
# 次ステップのために現在のパラメータを保存
prev_opt_params[:] = opt_params[:]
# 1. コンソール出力: print_intervalごと
is_print = (step_count % args.print_interval == 0)
if step_count == 1 or is_print:
print(f"[Step {step_count:4d}] EF={EF:.5f}, BG={BG:.5f}, w={w:.5f}, I0={I0:.5f}, DOSa={DOSa:.5f}, Error={err:.6e}")
# 2. アニメーション更新
# if step_count % (args.print_interval // 10 + 1) == 0:
if is_print:
kG_c = fwhm2kG(w)
I_model_current = model_spectrum(E, EF, BG, kG_c, I0, DOSa, args.T)
line_fit.set_ydata(I_model_current)
ax.set_title(f"Step {step_count} / Error: {err:.2e}")
fig.canvas.draw()
fig.canvas.flush_events()
plt.pause(0.001)
return False # 継続
# calSの引数に termination_state を追加
options = {'maxiter': args.nmaxiter}
result = minimize(calS, log_params0, args=(E_fit, I_fit, args.T, initial_params_dict, termination_state),
method=args.method, callback=callback, options=options)
# ---------------------------------------------
# 終了後の処理
# ---------------------------------------------
plt.ioff()
# 最適解を元のスケールに復元
EF_fit, BG_fit, w_fit, I0_fit, DOSa_fit = recover_params(result.x, initial_params_dict)
kG_fit = fwhm2kG(w_fit)
final_params_dict = {
"EF": EF_fit, "BG": BG_fit, "w": w_fit, "I0": I0_fit, "DOSa": DOSa_fit
}
print_params("Optimized", final_params_dict)
# パラメータCSVの更新: Valueカラムに最適化結果を上書き
for name, value in final_params_dict.items():
if name in df_params.index:
df_params.loc[name, 'Value'] = value
print()
save_params(prm_csv_path, df_params)
# 最終モデル計算
I_model_final = model_spectrum(E, EF_fit, BG_fit, kG_fit, I0_fit, DOSa_fit, args.T)
# 最終結果の表示
line_fit.set_ydata(I_model_final)
line_fit.set_color('blue')
line_fit.set_label('optimized')
ax.legend()
ax.set_title("Fitting completed")
fig.canvas.draw()
plt.pause(0.1)
# Excel出力 (NumPy配列から直接DataFrameを作成)
df_output = pd.DataFrame({
"Energy (eV)": E,
"Measured": I,
"Initial": I_model_init, # InitialとFinalが逆になっているので修正
"Final": I_model_final,
})
excel_path = os.path.splitext(args.filename)[0] + "_fit.xlsx"
df_output.to_excel(excel_path, index=False)
print(f"Fitting results saved to [{excel_path}]")
# 履歴保存 (変更なし)
"""
history_columns = ["Step"] + ALL_PARAM_NAMES + ["Error"]
hist_array = np.array(history)
hist_df = pd.DataFrame(hist_array, columns=history_columns)
hist_df['kG'] = fwhm2kG(hist_df['w'])
output_cols = ["Step"] + ALL_PARAM_NAMES + ["Error"]
output_hist_df = hist_df[output_cols]
hist_path = os.path.splitext(args.filename)[0] + "_history.xlsx"
output_hist_df.to_excel(hist_path, index=False)
print(f"History saved to [{hist_path}]")
"""
# -------------------------------
# シミュレーションモード
# -------------------------------
[ドキュメント]
def simulate(args: argparse.Namespace, df_params: pd.DataFrame) -> None:
"""
指定されたパラメータと条件に基づいて光電子分光スペクトルをシミュレーションし、結果をプロットする。
詳細説明:
コマンドライン引数やCSVファイルから読み込んだパラメータを用いて、
`model_spectrum` 関数によりスペクトルを計算します。
計算されたモデルスペクトルと測定データ、およびモデルの構成要素
(状態密度 (DOS), フェルミ・ディラック分布関数 (fFD), DOSとfFDの積, 装置関数 (g_app))
をグラフに表示します。
:param args: コマンドライン引数。
:type args: argparse.Namespace
:param df_params: パラメータCSVから読み込まれたDataFrame。
:type df_params: pandas.DataFrame
:returns: None
:raises ValueError: `w` (FWHM) または `I0` (シグナル強度) が正でない場合。
"""
E, I = load_spectrum(args.filename, args.Escale)
if type(args.Efit_min) is str:
try:
args.Efit_min = float(args.Efit_min)
except:
args.Efit_min = min(E)
if type(args.Efit_max) is str:
try:
args.Efit_max = float(args.Efit_max)
except:
args.Efit_max = max(E)
print()
print("Simulation:") # Fitting: を Simulation: に変更
print(f"Escale : {args.Escale}")
print(f"Efit_min: {args.Efit_min}")
print(f"Efit_max: {args.Efit_max}")
initial_params_dict = update_params(args, df_params)
EF0 = initial_params_dict['EF']
BG0 = initial_params_dict['BG']
w0 = initial_params_dict['w']
I00 = initial_params_dict['I0']
DOSa0 = initial_params_dict['DOSa']
kG0 = initial_params_dict['kG']
if w0 <= 0 or I00 <= 0:
raise ValueError("w (FWHM) and I0 must be positive for simulation.")
print()
print("Simulation parameters:")
print_params("使用パラメータ", initial_params_dict)
I_model = model_spectrum(E, EF0, BG0, kG0, I00, DOSa0, args.T)
D = DOS(E, DOSa0)
f = fermi(E, EF0, args.T)
# カーネルプロット用のエネルギー範囲を調整
# E_kernelは中心が0になるようにlinspaceで生成
E_center = (E[-1] + E[0]) / 2.0
E_range = E[-1] - E[0]
num_points = len(E)
E_kernel = np.linspace(-E_range / 2, E_range / 2, num_points)
kG_for_plot = fwhm2kG(w0)
G = app_func(E_kernel, kG_for_plot)
if np.max(G) > 1e-10: # ゼロ除算防止
G /= np.max(G)
plt.figure()
plt.plot(E, I, label="obs", linestyle = '', marker = 'o', markersize = 2.0)
plt.plot(E, I_model, label="sim", linestyle = '-')
plt.plot(E, 0.5*I00 * D, label="DOS(E)", linestyle = '-', linewidth = 0.5)
plt.plot(E, 0.5*I00 * f, label="fFD(E)", linestyle = '-', linewidth = 0.5)
plt.plot(E, 0.5*I00 * D * f, label="DOS(E)*fFD(E)", linestyle = '-', linewidth = 0.5)
plt.plot(E, 0.5*I00 * G, label="g_app(E)", linestyle = '-', linewidth = 0.5)
if args.Efit_min: plt.axvline(args.Efit_min, color='gray', linestyle='--', linewidth = 0.5)
if args.Efit_max: plt.axvline(args.Efit_max, color='gray', linestyle='--', linewidth = 0.5)
plt.xlabel("Energy (eV)")
plt.ylabel("Intensity")
plt.legend()
plt.title("Simulation result") # Plot title for simulation
plt.tight_layout()
plt.pause(0.1)
# -------------------------------
# メイン関数
# -------------------------------
[ドキュメント]
def main():
"""
プログラムのエントリポイント。
詳細説明:
コマンドライン引数を処理し、指定されたモード(fit, sim, init)に応じて
適切な処理を呼び出します。
- 畳み込み関数を `convolute_by_func_np` に設定します。
- `early_stop_flag` は `Nelder-Mead` メソッドでない場合に `True` に設定されます。
- `w` と `I0` のコマンドライン引数での正値チェックを行います。
- パラメータCSVファイルの読み込みと更新を管理します。
:returns: None
:raises ValueError: コマンドライン引数で `w` または `I0` が正でない場合、
または未対応のモードが指定された場合。
"""
global convolute_by_func, early_stop_flag # early_stop を early_stop_flag に変更
convolute_by_func = convolute_by_func_np
# convolute_by_func = convolute_by_func_original
early_stop_flag = True # デフォルトを True に設定し、メソッドによって変更
args = initialize()
if args.method.lower() == 'simplex': args.method = 'nelder-mead'
if args.method.lower() == 'nelder-mead':
early_stop_flag = False # Nelder-Mead は収束が遅いため、epsによる早期停止は非推奨
if (args.mode != "init" and (args.w is not None and args.w <= 0 or args.I0 is not None and args.I0 <= 0)):
raise ValueError("w (FWHM) and I0 must be positive in command line arguments.")
prm_csv_path = get_prm_csv_path(args.filename)
df_params = load_params(prm_csv_path)
if args.mode == "init":
init_params(args, df_params)
elif args.mode == "fit":
fit(args, prm_csv_path, df_params)
elif args.mode == "sim":
simulate(args, df_params)
else:
raise ValueError("modeは 'fit', 'sim', 'init' のいずれかを指定してください")
if __name__ == "__main__":
main()
terminate()