#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ft_exafs_scan.py
EXAFSデータのk空間におけるFFT/MEM変換ツールです。
入力データ:
Column 0 : k [A^-1]
Column 1 : signal, 通常 chi(k), k^2 chi(k) など。
主な機能:
- EXAFS R軸補正: chi(k) ~ sin(2 k R + phase) に基づき、R = pi * frequency を使用。
- 変換モード: fft, mem, both のいずれかを選択可能。
- FFT出力: real, imag, amplitude, power を選択可能。
- ゼロパディング: --npadding オプションで制御(デフォルト 1024)。
- k重み付け: --korder オプションで制御。
- EXAFSスタイルのハニング窓: --hrange と --hwidth オプションで制御。
- パラメータスキャンオーバーレイ:
--scan_order START STEP N
--scan_hmin START STEP N
--scan_hmax START STEP N
--scan_korder START STEP N
--scan_npadding START STEP N
- オプションの正規化:
--normalize none|max
使用例:
# 通常のFFT
python ft_exafs_scan.py data.xlsx --mode fft --korder 0 --hrange 2 12 --hwidth 0.5
# MEM, 固定次数 20
python ft_exafs_scan.py data.xlsx 20 --mode mem --korder 0 --hrange 2 12 --hwidth 0.5
# MEM次数をスキャン: 10,20,30,40,50
python ft_exafs_scan.py data.xlsx --mode mem --scan_order 10 10 5 --korder 0 --hrange 2 12 --hwidth 0.5
# ハニング窓の幅をスキャン: 0.2,0.4,0.6,0.8,1.0
python ft_exafs_scan.py data.xlsx --mode fft --scan_hwidth 0.2 0.2 5 --korder 0 --hrange 2 12
# 形状比較のために各曲線を最大値1に正規化してスキャン
python ft_exafs_scan.py data.xlsx --mode mem --scan_order 20 20 5 --normalize max
関連リンク:
:doc:`ft_exafs_scan_usage`
"""
import argparse
import copy
import os
import sys
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from spectrum import pburg
from statsmodels.tsa.ar_model import AutoReg
from statsmodels.tsa.stattools import pacf
warnings.filterwarnings("ignore")
# ============================================================
# Input
# ============================================================
[ドキュメント]
def load_xy(infile, sheet=0, xcol=0, ycol=1):
"""
.xlsx/.csv/.txt/.datファイルからk軸と信号の列を読み込みます。
詳細説明:
指定された入力ファイルパスから、k軸と信号のデータを読み込みます。
ファイル形式はExcel (.xlsx)、CSV (.csv)、テキスト (.txt, .dat) をサポートします。
欠損値 (NaN, Inf) は除去され、データポイントが少ない場合はエラーとなります。
k軸が単調増加でない場合はソートされます。
:param infile: 入力ファイルへのパス。
:type infile: str
:param sheet: Excelファイルの場合のシート名またはシートインデックス。デフォルトは0。
:type sheet: Union[str, int]
:param xcol: k軸データの列インデックス(0から始まる)。デフォルトは0。
:type xcol: int
:param ycol: 信号データの列インデックス(0から始まる)。デフォルトは1。
:type ycol: int
:returns: k軸データと信号データ。
:rtype: tuple[numpy.ndarray, numpy.ndarray]
:raises ValueError: サポートされていないファイル形式、2Dテーブルでないテキスト入力、
指定された列インデックスが存在しない、有効なデータポイントが少ない、
などの場合に発生します。
"""
ext = os.path.splitext(infile)[1].lower()
if ext == ".xlsx":
df = pd.read_excel(infile, sheet_name=sheet)
k = df.iloc[:, xcol].to_numpy(dtype=float)
signal = df.iloc[:, ycol].to_numpy(dtype=float)
elif ext == ".csv":
df = pd.read_csv(infile)
k = df.iloc[:, xcol].to_numpy(dtype=float)
signal = df.iloc[:, ycol].to_numpy(dtype=float)
elif ext in [".txt", ".dat"]:
arr = np.loadtxt(infile)
if arr.ndim != 2:
raise ValueError("Text input must be a 2D table.")
if arr.shape[1] <= max(xcol, ycol):
raise ValueError(
f"Text input must have at least {max(xcol, ycol) + 1} columns."
)
k = arr[:, xcol].astype(float)
signal = arr[:, ycol].astype(float)
else:
raise ValueError("Unsupported file format. Use .xlsx, .csv, .txt, or .dat.")
mask = np.isfinite(k) & np.isfinite(signal)
k = k[mask]
signal = signal[mask]
if len(k) < 4:
raise ValueError("Not enough valid data points.")
if np.any(np.diff(k) < 0):
print("WARNING: k-axis is not monotonic. Sorting by k.")
idx = np.argsort(k)
k = k[idx]
signal = signal[idx]
return k, signal
[ドキュメント]
def estimate_dk(k, rtol=1e-3):
"""
k軸からk間隔(dk)を推定します。
詳細説明:
k軸の隣接する値の差分の平均値からk間隔を推定します。
k軸の均一性を相対標準偏差で評価し、均一でない場合は警告を表示します。
FFT/MEMは均一にサンプリングされたkデータを仮定するため、不均一な場合は注意が必要です。
:param k: k軸データ。
:type k: numpy.ndarray
:param rtol: k間隔の差分の相対標準偏差の許容範囲。この値を超えると警告が表示されます。デフォルトは1e-3。
:type rtol: float
:returns: 推定されたk間隔と、k間隔の差分の相対標準偏差。
:rtype: tuple[float, float]
:raises ValueError: dkが0の場合に発生します。
"""
dk_arr = np.diff(k)
dk = np.mean(dk_arr)
if dk == 0:
raise ValueError("dk is zero.")
rel_std = np.std(dk_arr) / abs(dk)
if rel_std > rtol:
print("WARNING: k-axis is not perfectly uniform.")
print(f" mean dk = {dk:.10g}")
print(f" std(diff) = {np.std(dk_arr):.10g}")
print(f" relative std = {rel_std:.3e}")
print(" FFT/MEM assume uniformly sampled k data.")
print(" Consider interpolation onto a uniform k grid if needed.")
return dk, rel_std
# ============================================================
# Window and preprocessing
# ============================================================
[ドキュメント]
def make_hanning_range_window(k, hrange=None, hwidth=0.5):
"""
k軸にEXAFSスタイルのハニング窓を作成します。
詳細説明:
指定されたk範囲(`hrange`)と窓幅(`hwidth`)に基づいて、ハニング窓を生成します。
`hrange`が指定されない場合、すべてのkに対して1.0の窓関数が返されます。
`hwidth`が0の場合、矩形窓が生成されます。
`2.0 * hwidth`が`hrange`の幅よりも大きい場合、`hwidth`は自動調整されます。
:param k: k軸データ。
:type k: numpy.ndarray
:param hrange: ハニング窓を適用するk範囲 [KSTART, KEND]。デフォルトはNone。
:type hrange: Optional[list[float]]
:param hwidth: ハニング窓のテーパー幅 [A^-1]。デフォルトは0.5。
:type hwidth: float
:returns: k軸と同じ長さのハニング窓配列。
:rtype: numpy.ndarray
:raises ValueError: `hrange`に2つの異なる値が必要な場合、または`hwidth`が0未満の場合に発生します。
"""
k = np.asarray(k, dtype=float)
if hrange is None:
return np.ones_like(k, dtype=float)
k1 = float(hrange[0])
k2 = float(hrange[1])
if k2 < k1:
k1, k2 = k2, k1
if k2 <= k1:
raise ValueError("--hrange requires two different values: KSTART KEND")
if hwidth < 0:
raise ValueError("--hwidth must be >= 0")
if 2.0 * hwidth > (k2 - k1):
new_hwidth = 0.5 * (k2 - k1)
print("WARNING: 2*hwidth is larger than hrange width.")
print(f" hwidth changed from {hwidth:.10g} to {new_hwidth:.10g}")
hwidth = new_hwidth
win = np.zeros_like(k, dtype=float)
if hwidth == 0:
win[(k >= k1) & (k <= k2)] = 1.0
return win
left = (k >= k1) & (k < k1 + hwidth)
flat = (k >= k1 + hwidth) & (k <= k2 - hwidth)
right = (k > k2 - hwidth) & (k <= k2)
win[left] = 0.5 * (1.0 - np.cos(np.pi * (k[left] - k1) / hwidth))
win[flat] = 1.0
win[right] = 0.5 * (1.0 + np.cos(np.pi * (k[right] - (k2 - hwidth)) / hwidth))
return win
[ドキュメント]
def preprocess_signal(k, signal_raw, korder=0.0, hrange=None, hwidth=0.5,
remove_mean=False):
"""
k重み付け、オプションの平均減算、ハニング窓を適用して信号を前処理します。
詳細説明:
入力された生の信号に、`korder`に基づくk重み付けを適用します。
例えば、入力がchi(k)の場合、`korder=2`を指定するとk^2 chi(k)になります。
その後、`remove_mean`がTrueの場合、k重み付けされた信号から平均値が減算されます。
最後に、`make_hanning_range_window`で作成されたハニング窓が適用され、
変換に使用される最終的な信号が生成されます。
:param k: k軸データ。
:type k: numpy.ndarray
:param signal_raw: 生の信号データ。
:type signal_raw: numpy.ndarray
:param korder: 信号に適用するk重み付けの次数。デフォルトは0.0。
:type korder: float
:param hrange: ハニング窓を適用するk範囲 [KSTART, KEND]。デフォルトはNone。
:type hrange: Optional[list[float]]
:param hwidth: ハニング窓のテーパー幅 [A^-1]。デフォルトは0.5。
:type hwidth: float
:param remove_mean: k重み付け後に信号の平均値を減算するかどうか。デフォルトはFalse。
:type remove_mean: bool
:returns: 前処理された信号、k重み付けされた信号、平均値が減算された信号、および適用されたハニング窓。
:rtype: tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]
"""
k = np.asarray(k, dtype=float)
signal_raw = np.asarray(signal_raw, dtype=float)
kweight = np.power(k, korder)
signal_kweighted = signal_raw * kweight
if remove_mean:
signal_centered = signal_kweighted - np.mean(signal_kweighted)
else:
signal_centered = signal_kweighted.copy()
window = make_hanning_range_window(k, hrange=hrange, hwidth=hwidth)
signal_used = signal_centered * window
return signal_used, signal_kweighted, signal_centered, window
# ============================================================
# AR order selection
# ============================================================
[ドキュメント]
def select_ar_order(signal, preset_order=8, max_order=40, order_method="bic"):
"""
AIC/BICまたは事前設定値に基づいてARモデルの次数を選択します。
詳細説明:
自己回帰 (AR) モデルの次数を選択するために、部分自己相関関数 (PACF)、
赤池情報量基準 (AIC)、ベイズ情報量基準 (BIC) を使用します。
PACFは診断情報として表示されますが、主要な次数選択にはAIC/BICが使われます。
`order_method`に基づいて、最適な次数が選択されます。
:param signal: ARモデルを適用する信号データ。
:type signal: numpy.ndarray
:param preset_order: `order_method`が"preset"の場合に適用される、事前設定のAR次数。デフォルトは8。
:type preset_order: int
:param max_order: AIC/BICでテストするARモデルの最大次数。デフォルトは40。
:type max_order: int
:param order_method: AR次数選択方法。"bic", "aic", "preset"のいずれか。デフォルトは"bic"。
:type order_method: str
:returns: 選択されたAR次数と、AIC/BICの情報を含む辞書。
:rtype: tuple[int, dict]
:raises ValueError: `order_method`がサポートされていない値の場合に発生します。
"""
n = len(signal)
max_lag = min(max_order, n // 2)
try:
pacf_vals, confint = pacf(signal, nlags=max_lag, alpha=0.05)
print(f"PACF with 95% CI for lags 1..{max_lag}:")
significant_lags = []
for lag in range(1, max_lag + 1):
lower, upper = confint[lag]
val = pacf_vals[lag]
sig = (lower > 0.0) or (upper < 0.0)
marker = "*" if sig else " "
print(
f"lag={lag:2d}: pacf={val: .4f} "
f"CI=({lower: .4f},{upper: .4f}) {marker}"
)
if sig:
significant_lags.append(lag)
if significant_lags:
print("PACF significant lags, diagnostic only: " f"{significant_lags[:10]}")
else:
print("No PACF spike outside zero-confidence interval.")
print("PACF is not used as the primary order selector.\n")
except Exception as e:
print(f"PACF calculation failed: {e}")
print("Continue with AIC/BIC order selection.\n")
best_aic = np.inf
best_bic = np.inf
best_p_aic = None
best_p_bic = None
max_ar = min(max_order, n - 1)
for p in range(1, max_ar + 1):
try:
model = AutoReg(signal, lags=p, old_names=False).fit()
if model.aic < best_aic:
best_aic = model.aic
best_p_aic = p
if model.bic < best_bic:
best_bic = model.bic
best_p_bic = p
except Exception:
pass
print(f"Selected order by AIC: {best_p_aic}, AIC={best_aic:.6g}")
print(f"Selected order by BIC: {best_p_bic}, BIC={best_bic:.6g}\n")
if order_method == "bic":
if best_p_bic is not None:
order_selected = best_p_bic
elif best_p_aic is not None:
order_selected = best_p_aic
else:
order_selected = preset_order
elif order_method == "aic":
if best_p_aic is not None:
order_selected = best_p_aic
elif best_p_bic is not None:
order_selected = best_p_bic
else:
order_selected = preset_order
elif order_method == "preset":
order_selected = preset_order
else:
raise ValueError("order_method must be one of: bic, aic, preset")
print(f"Order selection method = {order_method}")
print(f"Using AR model order = {order_selected}\n")
return order_selected, {
"order_method": order_method,
"best_p_aic": best_p_aic,
"best_aic": best_aic,
"best_p_bic": best_p_bic,
"best_bic": best_bic,
}
# ============================================================
# FFT / MEM
# ============================================================
[ドキュメント]
def calc_fft_exafs(signal_used, dk, nfft):
"""
EXAFSスタイルのFFTを計算します。
詳細説明:
FFTは `np.fft.rfft` を使用して計算されます。
EXAFSでは `chi(k) ~ sin(2 k R + phi)` の関係があるため、R軸は `R = pi * frequency` と定義されます。
積分スケールを模倣するために、FTの結果は `dk` を乗算されます。
結果として、複素FFT、実数部、虚数部、振幅、パワーが返されます。
:param signal_used: 前処理された信号データ。
:type signal_used: numpy.ndarray
:param dk: k軸の平均k間隔。
:type dk: float
:param nfft: FFTのNFFT長(ゼロパディングを含む)。
:type nfft: int
:returns: EXAFS R軸、複素FFT、FFTの実数部、FFTの虚数部、FFTの振幅、FFTのパワー。
:rtype: tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]
"""
ft_complex = np.fft.rfft(signal_used, n=nfft) * dk
freq = np.fft.rfftfreq(nfft, d=dk)
R = np.pi * freq
ft_real = ft_complex.real
ft_imag = ft_complex.imag
ft_amp = np.abs(ft_complex)
ft_power = ft_amp ** 2
return R, ft_complex, ft_real, ft_imag, ft_amp, ft_power
[ドキュメント]
def calc_mem_exafs(signal_used, dk, order, nfft):
"""
`spectrum.pburg` を使用してMEMスペクトルを計算します。
詳細説明:
Burg法(`spectrum.pburg`)を用いて最大エントロピー法 (MEM) スペクトルを計算します。
`pburg.frequencies()` は `cycles/sample` の単位で周波数を返すため、
EXAFSのR軸 (`R = pi * frequency / dk`) に変換されます。
:param signal_used: 前処理された信号データ。
:type signal_used: numpy.ndarray
:param dk: k軸の平均k間隔。
:type dk: float
:param order: ARモデルの次数。
:type order: int
:param nfft: FFTのNFFT長(ゼロパディングを含む)。
:type nfft: int
:returns: EXAFS R軸、サンプルあたりの周波数、MEMスペクトルのパワースペクトル密度 (PSD)。
:rtype: tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]
"""
burg_spec = pburg(signal_used, order=order, NFFT=nfft)
mem_psd = np.asarray(burg_spec.psd, dtype=float)
freqs_sample = np.asarray(burg_spec.frequencies(), dtype=float)
# pburg.frequencies() is cycles/sample. Convert to R for EXAFS.
R = np.pi * freqs_sample / dk
return R, freqs_sample, mem_psd
# ============================================================
# Utilities
# ============================================================
[ドキュメント]
def find_main_peak(R, y, rmin=1e-12):
"""
R=0を除外してメインピークを見つけます。
詳細説明:
指定されたR軸とyデータから、Rが`rmin`よりも大きい範囲でyの最大値を見つけ、
そのピークのR値とy値を返します。
:param R: R軸データ。
:type R: numpy.ndarray
:param y: 変換された信号(振幅、PSDなど)。
:type y: numpy.ndarray
:param rmin: ピーク検索から除外するR軸の最小値。デフォルトは1e-12。
:type rmin: float
:returns: メインピークのR値とy値。ピークが見つからない場合はNaN。
:rtype: tuple[float, float]
"""
R = np.asarray(R)
y = np.asarray(y)
mask = np.isfinite(R) & np.isfinite(y) & (R > rmin)
if not np.any(mask):
return np.nan, np.nan
R2 = R[mask]
y2 = y[mask]
idx = np.argmax(y2)
return R2[idx], y2[idx]
[ドキュメント]
def normalize_curve(y, method="none"):
"""
プロットや比較のために1つの曲線を正規化します。
詳細説明:
`method`が"none"の場合、入力`y`は変更されずに返されます。
`method`が"max"の場合、`y`の有限値の絶対値の最大値で`y`全体を割って正規化します。
これは、MEMスペクトルのスケールがAR次数によって大きく変化する場合の比較に特に有用です。
:param y: 正規化する曲線データ。
:type y: numpy.ndarray
:param method: 正規化方法。"none"または"max"。デフォルトは"none"。
:type method: str
:returns: 正規化された曲線データ。
:rtype: numpy.ndarray
:raises ValueError: `method`が"none"または"max"以外の値の場合に発生します。
"""
y = np.asarray(y, dtype=float)
if method == "none":
return y.copy()
if method != "max":
raise ValueError("normalize must be 'none' or 'max'")
out = y.copy()
mask = np.isfinite(out)
if not np.any(mask):
return out
ymax = np.nanmax(np.abs(out[mask]))
if ymax > 0:
out = out / ymax
return out
[ドキュメント]
def normalized_label(label, normalize):
"""
プロットラベルに正規化タグを追加します。
詳細説明:
正規化方法が"max"の場合、元のラベルに `/ max` を追加して返します。
それ以外の正規化方法の場合は、元のラベルをそのまま返します。
:param label: 元のプロットラベル。
:type label: str
:param normalize: 正規化方法。
:type normalize: str
:returns: 正規化タグが追加された(または変更なしの)ラベル。
:rtype: str
"""
if normalize == "max":
return f"{label} / max"
return label
[ドキュメント]
def make_output_filename(infile, mode, order, korder, scan_param=None):
"""
出力Excelファイル名を生成します。
詳細説明:
入力ファイル名、変換モード、MEM次数、k重み付け次数、およびオプションのスキャンパラメータに基づいて、
自動的に出力ファイル名を生成します。
:param infile: 入力ファイルへのパス。
:type infile: str
:param mode: 変換モード ("fft", "mem", "both")。
:type mode: str
:param order: MEMのAR次数。
:type order: int
:param korder: k重み付けの次数。
:type korder: float
:param scan_param: アクティブなスキャンパラメータの名前。デフォルトはNone。
:type scan_param: Optional[str]
:returns: 生成された出力ファイル名。
:rtype: str
"""
stem, _ = os.path.splitext(infile)
scan_tag = "" if scan_param is None else f"_scan_{scan_param}"
if mode == "mem":
return f"{stem}_exafs_mem_order={order}_korder={korder:g}{scan_tag}.xlsx"
elif mode == "fft":
return f"{stem}_exafs_fft_korder={korder:g}{scan_tag}.xlsx"
else:
return f"{stem}_exafs_fft_mem_order={order}_korder={korder:g}{scan_tag}.xlsx"
[ドキュメント]
def scan_values(start, step, n, integer=False):
"""
スキャンパラメータの値を生成します。
詳細説明:
開始値 (`start`)、ステップサイズ (`step`)、およびステップ数 (`n`) に基づいて、
等間隔の数値のリストを生成します。`integer`がTrueの場合、値は整数に丸められます。
:param start: スキャン開始値。
:type start: float
:param step: 各ステップでの増分値。
:type step: float
:param n: 生成する値の数。
:type n: int
:param integer: 生成された値を整数に丸めるかどうか。デフォルトはFalse。
:type integer: bool
:returns: 生成されたスキャン値のリスト。
:rtype: list[float] or list[int]
"""
vals = [start + step * i for i in range(int(n))]
if integer:
vals = [int(round(v)) for v in vals]
return vals
[ドキュメント]
def get_active_scan(args):
"""
アクティブなスキャンパラメータとその値を取得します。
詳細説明:
同時に指定できるスキャンパラメータは1つだけです。
複数のスキャンパラメータが指定された場合、エラーを発生させます。
選択されたスキャンパラメータとそれに対応する値のリストを返します。
:param args: コマンドライン引数を格納する名前空間オブジェクト。
:type args: argparse.Namespace
:returns: アクティブなスキャンパラメータの名前と、それに対応する値のリスト。
アクティブなスキャンがない場合は (None, None) を返します。
:rtype: tuple[Optional[str], Optional[list[Union[int, float]]]]
:raises ValueError: 複数のスキャンパラメータが指定された場合、または有効なスキャン値が生成されない場合に発生します。
"""
specs = {
"order": args.scan_order,
"hmin": args.scan_hmin,
"hmax": args.scan_hmax,
"hwidth": args.scan_hwidth,
"korder": args.scan_korder,
"npadding": args.scan_npadding,
}
active = [(name, spec) for name, spec in specs.items() if spec is not None]
if len(active) == 0:
return None, None
if len(active) > 1:
names = ", ".join(name for name, _ in active)
raise ValueError(
"Only one scan parameter can be specified at a time. "
f"Active scans: {names}"
)
name, spec = active[0]
start, step, n = spec
integer = name in ["order", "npadding"]
vals = scan_values(start, step, int(n), integer=integer)
if name == "order":
vals = [v for v in vals if v > 0]
elif name == "npadding":
vals = [v for v in vals if v > 0]
if len(vals) == 0:
raise ValueError(f"No valid scan values for {name}.")
return name, vals
[ドキュメント]
def apply_scan_value(args, scan_param, value, k):
"""
渡されたスキャンパラメータの値を `args` 名前空間に適用し、そのコピーを返します。
詳細説明:
元の `args` オブジェクトを変更しないように、まずコピーを作成します。
次に、`scan_param`に応じて対応する`args`属性に`value`を適用します。
`hrange`が設定されていない場合は、k軸の全範囲で初期化されます。
:param args: コマンドライン引数を格納する元の名前空間オブジェクト。
:type args: argparse.Namespace
:param scan_param: 変更するスキャンパラメータの名前。
:type scan_param: str
:param value: 適用するスキャンパラメータの値。
:type value: Union[int, float]
:param k: k軸データ。`hrange`の初期化に使用される場合があります。
:type k: numpy.ndarray
:returns: 指定されたスキャン値が適用された `args` 名前空間のコピー。
:rtype: argparse.Namespace
:raises ValueError: 未知のスキャンパラメータが指定された場合に発生します。
"""
a = copy.copy(args)
if scan_param == "order":
a.preset_order = int(value)
a.order_method = "preset"
elif scan_param == "hmin":
if a.hrange is None:
a.hrange = [float(np.min(k)), float(np.max(k))]
a.hrange = [float(value), float(a.hrange[1])]
elif scan_param == "hmax":
if a.hrange is None:
a.hrange = [float(np.min(k)), float(np.max(k))]
a.hrange = [float(a.hrange[0]), float(value)]
elif scan_param == "hwidth":
a.hwidth = float(value)
elif scan_param == "korder":
a.korder = float(value)
elif scan_param == "npadding":
a.npadding = int(value)
else:
raise ValueError(f"Unknown scan parameter: {scan_param}")
return a
[ドキュメント]
def y_from_fft_result(fft_result, fft_plot):
"""
FFT結果からプロットに適したR軸とyデータを抽出します。
詳細説明:
`fft_plot`引数の値に基づいて、FFTの振幅、パワー、または実数部を返します。
:param fft_result: `calc_fft_exafs`関数から返されたFFT結果のタプル。
:type fft_result: tuple
:param fft_plot: プロットするFFTの種類。"amplitude", "power", "realimag"のいずれか。
:type fft_plot: str
:returns: R軸、プロットするyデータ、yデータの名前。
:rtype: tuple[numpy.ndarray, numpy.ndarray, str]
"""
R, ft_complex, ft_real, ft_imag, ft_amp, ft_power = fft_result
if fft_plot == "power":
return R, ft_power, "FFT power"
if fft_plot == "realimag":
return R, ft_real, "FFT real"
return R, ft_amp, "FFT amplitude"
[ドキュメント]
def y_from_mem_result(mem_result):
"""
MEM結果からプロットに適したR軸とyデータを抽出します。
詳細説明:
`calc_mem_exafs`関数から返されたMEM結果から、R軸とMEMのパワースペクトル密度 (PSD) を抽出します。
:param mem_result: `calc_mem_exafs`関数から返されたMEM結果のタプル。
:type mem_result: tuple
:returns: R軸、プロットするMEM PSDデータ、yデータの名前。
:rtype: tuple[numpy.ndarray, numpy.ndarray, str]
"""
R, freqs_mem_sample, mem_psd = mem_result
return R, mem_psd, "MEM PSD"
# ============================================================
# Single calculation
# ============================================================
# ============================================================
# Save Excel
# ============================================================
[ドキュメント]
def save_results_excel(
outfile,
args,
k,
signal_raw,
dk,
rel_std_dk,
result,
scan_results=None,
scan_param=None,
):
"""
入力データ、FFT/MEM結果、およびスキャン結果をExcelファイルに保存します。
詳細説明:
結果は複数のシートに分割されて保存されます。
- `Input` シート: 生の信号、前処理された信号、窓関数など。
- `FFT` シート (FFTモードの場合): R軸、FFTの実数部、虚数部、振幅、パワー。正規化されたデータも含まれます。
- `MEM` シート (MEMモードの場合): R軸、MEMのPSD。正規化されたデータも含まれます。
- `ScanSummary` シート (スキャンモードの場合): 各スキャンパラメータ値でのピーク位置やパラメータの詳細。
- `ScanCurves` シート (スキャンモードの場合): 各スキャンにおける変換曲線のデータ。
- `Summary` シート: 変換に使用された主要なパラメータと情報。
:param outfile: 結果を保存するExcelファイルへのパス。
:type outfile: str
:param args: コマンドライン引数を格納する名前空間オブジェクト。
:type args: argparse.Namespace
:param k: k軸データ。
:type k: numpy.ndarray
:param signal_raw: 生の信号データ。
:type signal_raw: numpy.ndarray
:param dk: k軸の平均k間隔。
:type dk: float
:param rel_std_dk: k間隔の差分の相対標準偏差。
:type rel_std_dk: float
:param result: 単一の変換結果を含む辞書。`run_transform_for_args`の戻り値。
:type result: dict
:param scan_results: スキャンモードの場合の各スキャン結果のリスト。デフォルトはNone。
:type scan_results: Optional[list[dict]]
:param scan_param: アクティブなスキャンパラメータの名前。デフォルトはNone。
:type scan_param: Optional[str]
:returns: None
"""
signal_kweighted = result["signal_kweighted"]
signal_centered = result["signal_centered"]
window = result["window"]
signal_used = result["signal_used"]
fft_result = result["fft_result"]
mem_result = result["mem_result"]
order_selected = result["order_selected"]
order_info = result["order_info"]
df_input = pd.DataFrame({
"k_A^-1": k,
"signal_raw": signal_raw,
f"signal_k^{args.korder:g}_weighted": signal_kweighted,
"signal_centered": signal_centered,
"hanning_window": window,
"signal_used": signal_used,
})
summary_items = [
"input_file",
"sheet",
"xcol",
"ycol",
"mode",
"n_data",
"npadding",
"nfft_used",
"dk [A^-1]",
"relative_std_dk",
"korder",
"remove_mean",
"hrange",
"hwidth [A^-1]",
"AR_order_selected",
"order_method",
"best_p_aic",
"best_aic",
"best_p_bic",
"best_bic",
"scan_parameter",
"normalize",
"R_axis_definition",
"FFT_definition",
]
summary_values = [
args.infile,
args.sheet,
args.xcol,
args.ycol,
args.mode,
len(k),
args.npadding,
result["nfft"],
dk,
rel_std_dk,
args.korder,
args.remove_mean,
"" if args.hrange is None else str(args.hrange),
args.hwidth,
order_selected if order_selected is not None else "",
order_info.get("order_method", "") if order_info else "",
order_info.get("best_p_aic", "") if order_info else "",
order_info.get("best_aic", "") if order_info else "",
order_info.get("best_p_bic", "") if order_info else "",
order_info.get("best_bic", "") if order_info else "",
"" if scan_param is None else scan_param,
args.normalize,
"EXAFS: R = pi * frequency, because chi(k) ~ sin(2 k R + phase)",
"FT = rFFT(signal_used, nfft) * dk",
]
with pd.ExcelWriter(outfile, engine="openpyxl") as writer:
df_input.to_excel(writer, sheet_name="Input", index=False)
if fft_result is not None:
R_fft, ft_complex, ft_real, ft_imag, ft_amp, ft_power = fft_result
peak_R_fft, peak_y_fft = find_main_peak(R_fft, ft_amp)
df_fft = pd.DataFrame({
"R_A": R_fft,
"FT_real": ft_real,
"FT_imag": ft_imag,
"FT_amplitude": ft_amp,
"FT_power": ft_power,
"FT_real_norm": normalize_curve(ft_real, args.normalize),
"FT_imag_norm": normalize_curve(ft_imag, args.normalize),
"FT_amplitude_norm": normalize_curve(ft_amp, args.normalize),
"FT_power_norm": normalize_curve(ft_power, args.normalize),
})
df_fft.to_excel(writer, sheet_name="FFT", index=False)
summary_items += ["main_peak_R_FFT_amplitude [A]", "main_peak_FFT_amplitude"]
summary_values += [peak_R_fft, peak_y_fft]
if mem_result is not None:
R_mem, freqs_mem_sample, mem_psd = mem_result
peak_R_mem, peak_y_mem = find_main_peak(R_mem, mem_psd)
df_mem = pd.DataFrame({
"Frequency_MEM_cycles_per_sample": freqs_mem_sample,
"R_A": R_mem,
"MEM_PSD": mem_psd,
"MEM_PSD_norm": normalize_curve(mem_psd, args.normalize),
})
df_mem.to_excel(writer, sheet_name="MEM", index=False)
summary_items += ["main_peak_R_MEM [A]", "main_peak_MEM_PSD"]
summary_values += [peak_R_mem, peak_y_mem]
if scan_results:
rows = []
scan_curve_columns = {}
first_R = None
for sr in scan_results:
label = sr["label"]
scan_value = sr["scan_value"]
rargs = sr["args"]
if args.mode == "fft":
R, y, yname = y_from_fft_result(sr["fft_result"], args.fft_plot)
elif args.mode == "mem":
R, y, yname = y_from_mem_result(sr["mem_result"])
else:
# In both mode, scan table uses FFT amplitude as primary if present.
R, y, yname = y_from_fft_result(sr["fft_result"], args.fft_plot)
y_norm = normalize_curve(y, args.normalize)
peak_R, peak_y = find_main_peak(R, y)
peak_R_norm, peak_y_norm = find_main_peak(R, y_norm)
rows.append({
"scan_parameter": scan_param,
"scan_value": scan_value,
"label": label,
"mode": args.mode,
"y_name": yname,
"order": sr["order_selected"],
"npadding": rargs.npadding,
"nfft": sr["nfft"],
"korder": rargs.korder,
"hrange": str(rargs.hrange),
"hwidth": rargs.hwidth,
"peak_R_A": peak_R,
"peak_y": peak_y,
"normalize": args.normalize,
"peak_R_A_norm": peak_R_norm,
"peak_y_norm": peak_y_norm,
})
if first_R is None:
first_R = R
scan_curve_columns["R_A"] = R
if len(R) == len(first_R) and np.allclose(R, first_R, rtol=0, atol=1e-12):
scan_curve_columns[label] = y_norm
if args.normalize != "none":
scan_curve_columns[f"raw_{label}"] = y
else:
# Different npadding gives different R mesh.
# Save R/y pair as separate columns.
scan_curve_columns[f"R_A_{label}"] = R
scan_curve_columns[f"Y_{label}"] = y_norm
if args.normalize != "none":
scan_curve_columns[f"Y_raw_{label}"] = y
pd.DataFrame(rows).to_excel(writer, sheet_name="ScanSummary", index=False)
pd.DataFrame(dict([(k2, pd.Series(v2)) for k2, v2 in scan_curve_columns.items()])).to_excel(
writer, sheet_name="ScanCurves", index=False
)
df_summary = pd.DataFrame({"item": summary_items, "value": summary_values})
df_summary.to_excel(writer, sheet_name="Summary", index=False)
# ============================================================
# Plot
# ============================================================
[ドキュメント]
def plot_results(args, k, signal_raw, result, scan_results=None, scan_param=None):
"""
入力信号と窓関数、変換結果をプロットします。
スキャン結果が与えられた場合は、その曲線も重ねて表示します。
詳細説明:
matplotlibを使用して2つのサブプロットを生成します。
上段のサブプロットは、生の信号、前処理された信号、およびハニング窓関数を表示します。
下段のサブプロットは、FFTまたはMEMの変換結果を表示します。
`scan_results`が提供されている場合、各スキャンにおける変換曲線がオーバーレイされます。
正規化オプション、R軸の範囲制限、対数スケールなどのプロットオプションがサポートされます。
`--savefig`が指定されている場合、プロットは画像ファイルとして保存されます。
:param args: コマンドライン引数を格納する名前空間オブジェクト。
:type args: argparse.Namespace
:param k: k軸データ。
:type k: numpy.ndarray
:param signal_raw: 生の信号データ。
:type signal_raw: numpy.ndarray
:param result: 単一の変換結果を含む辞書。`run_transform_for_args`の戻り値。
:type result: dict
:param scan_results: スキャンモードの場合の各スキャン結果のリスト。デフォルトはNone。
:type scan_results: Optional[list[dict]]
:param scan_param: アクティブなスキャンパラメータの名前。デフォルトはNone。
:type scan_param: Optional[str]
:returns: None
"""
fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(9, 8))
ax0 = axes[0]
ax0.plot(k, signal_raw, label="raw signal", linewidth=1.2)
ax0.plot(k, result["signal_used"], label="used signal", linewidth=1.2)
ax0b = ax0.twinx()
ax0b.plot(k, result["window"], linestyle="--", linewidth=1.0, label="Hanning window")
ax0b.set_ylabel("window")
ax0.set_xlabel("k [A$^{-1}$]")
ax0.set_ylabel("signal")
ax0.set_title("Input and processed signal")
ax0.grid(True)
lines0, labels0 = ax0.get_legend_handles_labels()
lines1, labels1 = ax0b.get_legend_handles_labels()
ax0.legend(lines0 + lines1, labels0 + labels1, loc="best")
ax1 = axes[1]
if scan_results:
for sr in scan_results:
label = sr["label"]
if args.mode == "fft":
R, y, _ = y_from_fft_result(sr["fft_result"], args.fft_plot)
y_plot = normalize_curve(y, args.normalize)
ax1.plot(R, y_plot, linewidth=1.2, label=label)
elif args.mode == "mem":
R, y, _ = y_from_mem_result(sr["mem_result"])
y_plot = normalize_curve(y, args.normalize)
ax1.plot(R, y_plot, linewidth=1.2, label=label)
else:
R, y, _ = y_from_fft_result(sr["fft_result"], args.fft_plot)
y_plot = normalize_curve(y, args.normalize)
ax1.plot(R, y_plot, linewidth=1.0, label=f"FFT {label}")
Rm, ym, _ = y_from_mem_result(sr["mem_result"])
ym_plot = normalize_curve(ym, args.normalize)
ax1.plot(Rm, ym_plot, linestyle="--", linewidth=1.0, label=f"MEM {label}")
ax1.set_title(f"EXAFS transform scan: {scan_param}")
else:
fft_result = result["fft_result"]
mem_result = result["mem_result"]
order_selected = result["order_selected"]
if fft_result is not None:
R_fft, ft_complex, ft_real, ft_imag, ft_amp, ft_power = fft_result
if args.fft_plot == "realimag":
ax1.plot(R_fft, normalize_curve(ft_real, args.normalize), label=normalized_label("FFT real", args.normalize), linewidth=1.2)
ax1.plot(R_fft, normalize_curve(ft_imag, args.normalize), label=normalized_label("FFT imag", args.normalize), linewidth=1.2)
y_for_peak = normalize_curve(ft_amp, args.normalize)
elif args.fft_plot == "power":
y_plot = normalize_curve(ft_power, args.normalize)
ax1.plot(R_fft, y_plot, label=normalized_label("FFT power", args.normalize), linewidth=1.2)
y_for_peak = y_plot
else:
y_plot = normalize_curve(ft_amp, args.normalize)
ax1.plot(R_fft, y_plot, label=normalized_label("FFT amplitude", args.normalize), linewidth=1.2)
y_for_peak = y_plot
peak_R_fft, peak_y_fft = find_main_peak(R_fft, y_for_peak)
if np.isfinite(peak_R_fft):
ax1.axvline(peak_R_fft, linestyle="--", linewidth=0.8)
if mem_result is not None:
R_mem, freqs_mem_sample, mem_psd = mem_result
mem_psd_plot = normalize_curve(mem_psd, args.normalize)
ax1.plot(R_mem, mem_psd_plot, label=normalized_label(f"MEM PSD, order={order_selected}", args.normalize), linewidth=1.8)
peak_R_mem, peak_y_mem = find_main_peak(R_mem, mem_psd_plot)
if np.isfinite(peak_R_mem):
ax1.axvline(peak_R_mem, linestyle=":", linewidth=0.8)
ax1.set_title("EXAFS transform")
ax1.set_xlabel("R [A]")
ax1.set_ylabel("FT / PSD" if args.normalize == "none" else "Normalized FT / PSD")
ax1.grid(True)
ax1.legend()
if args.yscale == "log":
ax1.set_yscale("log")
if args.xlim is not None:
ax1.set_xlim(args.xlim[0], args.xlim[1])
plt.tight_layout()
if args.savefig:
plt.savefig(args.savefig, dpi=300)
print(f"Figure saved to {args.savefig}")
if not args.no_show:
plt.show()
# ============================================================
# Main
# ============================================================
[ドキュメント]
def main():
"""
スクリプトのメイン実行関数です。
詳細説明:
コマンドライン引数を解析し、データの読み込み、k軸間隔の推定、
FFT/MEM変換の実行、パラメータスキャン処理、結果のExcelファイルへの保存、
およびMatplotlibによる結果のプロットを行います。
エラーが発生した場合は、適切なメッセージを出力し、プログラムを終了します。
:returns: なし
:rtype: None
"""
parser = argparse.ArgumentParser(
description=(
"EXAFS-oriented FFT/MEM transform. "
"Input column 0 is k [A^-1], column 1 is chi(k)-like signal."
)
)
parser.add_argument("infile", help="Input file: .xlsx, .csv, .txt, or .dat")
parser.add_argument(
"preset_order",
nargs="?",
type=int,
default=8,
help="Fallback/fixed AR model order for MEM. Default: 8.",
)
parser.add_argument("--mode", choices=["fft", "mem", "both"], default="fft")
parser.add_argument("--sheet", default=0, help="Excel sheet name or index. Default: 0.")
parser.add_argument("--xcol", type=int, default=0, help="k-axis column index, 0-based.")
parser.add_argument("--ycol", type=int, default=1, help="signal column index, 0-based.")
parser.add_argument(
"--npadding",
type=int,
default=1024,
help="FFT/MEM NFFT length. If smaller than data length, data length is used.",
)
parser.add_argument(
"--korder",
type=float,
default=0.0,
help="Apply k^korder to input signal before transform. If input is already k^2 chi(k), use 0.",
)
parser.add_argument(
"--hrange",
type=float,
nargs=2,
default=None,
metavar=("KSTART", "KEND"),
help="Hanning window k range [A^-1], e.g. --hrange 2 12.",
)
parser.add_argument("--hwidth", type=float, default=0.5, help="Hanning taper width [A^-1].")
parser.add_argument("--remove-mean", action="store_true", help="Subtract mean after k-weighting and before windowing.")
parser.add_argument("--max-order", type=int, default=40, help="Maximum AR order tested by AIC/BIC.")
parser.add_argument(
"--order-method",
choices=["bic", "aic", "preset"],
default="preset",
help="AR order selection method for MEM. Default: preset.",
)
# Direct fixed-order option, more readable than positional preset_order.
parser.add_argument(
"--order",
type=int,
default=None,
help="Fixed AR order for MEM. Overrides positional preset_order and --order-method.",
)
# Scan parameters: START STEP N
parser.add_argument("--scan_order", type=float, nargs=3, default=None, metavar=("START", "STEP", "N"),
help="Scan MEM order. Example: --scan_order 10 10 5 gives 10,20,30,40,50.")
parser.add_argument("--scan_hmin", type=float, nargs=3, default=None, metavar=("START", "STEP", "N"),
help="Scan lower bound of --hrange.")
parser.add_argument("--scan_hmax", type=float, nargs=3, default=None, metavar=("START", "STEP", "N"),
help="Scan upper bound of --hrange.")
parser.add_argument("--scan_hwidth", type=float, nargs=3, default=None, metavar=("START", "STEP", "N"),
help="Scan Hanning taper width.")
parser.add_argument("--scan_korder", type=float, nargs=3, default=None, metavar=("START", "STEP", "N"),
help="Scan k-weight order.")
parser.add_argument("--scan_npadding", type=float, nargs=3, default=None, metavar=("START", "STEP", "N"),
help="Scan zero-padding NFFT length.")
parser.add_argument("--yscale", choices=["linear", "log"], default="linear")
parser.add_argument("--fft-plot", choices=["amplitude", "power", "realimag"], default="amplitude")
parser.add_argument(
"--normalize",
choices=["none", "max"],
default="none",
help="Normalize each plotted/saved comparison curve. 'max' divides each curve by max(abs(y)).",
)
parser.add_argument("--xlim", type=float, nargs=2, default=None, help="R-axis plot range, e.g. --xlim 0 6.")
parser.add_argument("--outfile", default="", help="Output Excel file. Default: auto-generated.")
parser.add_argument("--savefig", default="", help="Save figure filename, e.g. result.png.")
parser.add_argument("--no-show", action="store_true", help="Do not show matplotlib window.")
args = parser.parse_args()
if args.order is not None:
args.preset_order = args.order
args.order_method = "preset"
try:
args.sheet = int(args.sheet)
except Exception:
pass
try:
scan_param, scan_vals = get_active_scan(args)
except Exception as e:
print(f"Error in scan options: {e}")
sys.exit(1)
try:
k, signal_raw = load_xy(args.infile, sheet=args.sheet, xcol=args.xcol, ycol=args.ycol)
except Exception as e:
print(f"Error loading input file: {e}")
sys.exit(1)
try:
dk, rel_std_dk = estimate_dk(k)
except Exception as e:
print(f"Error estimating dk: {e}")
sys.exit(1)
print()
print("Input data summary")
print(f" file = {args.infile}")
print(f" mode = {args.mode}")
print(f" n_data = {len(k)}")
print(f" k_min = {np.min(k):.10g} [A^-1]")
print(f" k_max = {np.max(k):.10g} [A^-1]")
print(f" dk = {dk:.10g} [A^-1]")
print(f" rel std dk = {rel_std_dk:.3e}")
print(f" scan_param = {scan_param}")
print(f" scan_values = {scan_vals}")
print(f" normalize = {args.normalize}")
print()
try:
result = run_transform_for_args(args, k, signal_raw, dk, quiet=False)
except Exception as e:
print(f"Error calculating transform: {e}")
sys.exit(1)
scan_results = None
if scan_param is not None:
scan_results = []
print("Scan calculation")
for value in scan_vals:
try:
run_args = apply_scan_value(args, scan_param, value, k)
if scan_param == "order" and run_args.mode == "fft":
print("WARNING: --scan_order has no effect in --mode fft. Use --mode mem or --mode both.")
sr = run_transform_for_args(run_args, k, signal_raw, dk, quiet=True)
sr["scan_value"] = value
sr["label"] = f"{scan_param}={value:g}"
scan_results.append(sr)
if sr["mem_result"] is not None:
Rm, _, ym = sr["mem_result"]
pr, py = find_main_peak(Rm, ym)
print(f" {sr['label']}: MEM peak R={pr:.6g} A, y={py:.6g}")
if sr["fft_result"] is not None:
Rf, _, _, _, ya, _ = sr["fft_result"]
pr, py = find_main_peak(Rf, ya)
print(f" {sr['label']}: FFT peak R={pr:.6g} A, y={py:.6g}")
except Exception as e:
print(f" {scan_param}={value}: ERROR: {e}")
print()
if args.outfile:
outfile = args.outfile
else:
outfile = make_output_filename(
args.infile,
args.mode,
result["order_selected"] if result["order_selected"] is not None else args.preset_order,
args.korder,
scan_param=scan_param,
)
try:
save_results_excel(
outfile,
args,
k,
signal_raw,
dk,
rel_std_dk,
result,
scan_results=scan_results,
scan_param=scan_param,
)
print(f"Results saved to {outfile}")
except Exception as e:
print(f"Error saving Excel file: {e}")
plot_results(args, k, signal_raw, result, scan_results=scan_results, scan_param=scan_param)
input("\nPress ENTER to terminate>>\n")
if __name__ == "__main__":
main()