spectrum_.ft_exafs_scan のソースコード

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
ft_exafs_scan.py

EXAFSデータのk空間におけるFFT/MEM変換ツールです。

入力データ:
    Column 0 : k [A^-1]
    Column 1 : signal, 通常 chi(k), k^2 chi(k) など。

主な機能:
    - EXAFS R軸補正: chi(k) ~ sin(2 k R + phase) に基づき、R = pi * frequency を使用。
    - 変換モード: fft, mem, both のいずれかを選択可能。
    - FFT出力: real, imag, amplitude, power を選択可能。
    - ゼロパディング: --npadding オプションで制御(デフォルト 1024)。
    - k重み付け: --korder オプションで制御。
    - EXAFSスタイルのハニング窓: --hrange と --hwidth オプションで制御。
    - パラメータスキャンオーバーレイ:
        --scan_order START STEP N
        --scan_hmin START STEP N
        --scan_hmax START STEP N
        --scan_korder START STEP N
        --scan_npadding START STEP N
    - オプションの正規化:
        --normalize none|max

使用例:
    # 通常のFFT
    python ft_exafs_scan.py data.xlsx --mode fft --korder 0 --hrange 2 12 --hwidth 0.5

    # MEM, 固定次数 20
    python ft_exafs_scan.py data.xlsx 20 --mode mem --korder 0 --hrange 2 12 --hwidth 0.5

    # MEM次数をスキャン: 10,20,30,40,50
    python ft_exafs_scan.py data.xlsx --mode mem --scan_order 10 10 5 --korder 0 --hrange 2 12 --hwidth 0.5

    # ハニング窓の幅をスキャン: 0.2,0.4,0.6,0.8,1.0
    python ft_exafs_scan.py data.xlsx --mode fft --scan_hwidth 0.2 0.2 5 --korder 0 --hrange 2 12

    # 形状比較のために各曲線を最大値1に正規化してスキャン
    python ft_exafs_scan.py data.xlsx --mode mem --scan_order 20 20 5 --normalize max

関連リンク:
    :doc:`ft_exafs_scan_usage`
"""

import argparse
import copy
import os
import sys
import warnings

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from spectrum import pburg
from statsmodels.tsa.ar_model import AutoReg
from statsmodels.tsa.stattools import pacf

warnings.filterwarnings("ignore")


# ============================================================
# Input
# ============================================================

[ドキュメント] def load_xy(infile, sheet=0, xcol=0, ycol=1): """ .xlsx/.csv/.txt/.datファイルからk軸と信号の列を読み込みます。 詳細説明: 指定された入力ファイルパスから、k軸と信号のデータを読み込みます。 ファイル形式はExcel (.xlsx)、CSV (.csv)、テキスト (.txt, .dat) をサポートします。 欠損値 (NaN, Inf) は除去され、データポイントが少ない場合はエラーとなります。 k軸が単調増加でない場合はソートされます。 :param infile: 入力ファイルへのパス。 :type infile: str :param sheet: Excelファイルの場合のシート名またはシートインデックス。デフォルトは0。 :type sheet: Union[str, int] :param xcol: k軸データの列インデックス(0から始まる)。デフォルトは0。 :type xcol: int :param ycol: 信号データの列インデックス(0から始まる)。デフォルトは1。 :type ycol: int :returns: k軸データと信号データ。 :rtype: tuple[numpy.ndarray, numpy.ndarray] :raises ValueError: サポートされていないファイル形式、2Dテーブルでないテキスト入力、 指定された列インデックスが存在しない、有効なデータポイントが少ない、 などの場合に発生します。 """ ext = os.path.splitext(infile)[1].lower() if ext == ".xlsx": df = pd.read_excel(infile, sheet_name=sheet) k = df.iloc[:, xcol].to_numpy(dtype=float) signal = df.iloc[:, ycol].to_numpy(dtype=float) elif ext == ".csv": df = pd.read_csv(infile) k = df.iloc[:, xcol].to_numpy(dtype=float) signal = df.iloc[:, ycol].to_numpy(dtype=float) elif ext in [".txt", ".dat"]: arr = np.loadtxt(infile) if arr.ndim != 2: raise ValueError("Text input must be a 2D table.") if arr.shape[1] <= max(xcol, ycol): raise ValueError( f"Text input must have at least {max(xcol, ycol) + 1} columns." ) k = arr[:, xcol].astype(float) signal = arr[:, ycol].astype(float) else: raise ValueError("Unsupported file format. Use .xlsx, .csv, .txt, or .dat.") mask = np.isfinite(k) & np.isfinite(signal) k = k[mask] signal = signal[mask] if len(k) < 4: raise ValueError("Not enough valid data points.") if np.any(np.diff(k) < 0): print("WARNING: k-axis is not monotonic. Sorting by k.") idx = np.argsort(k) k = k[idx] signal = signal[idx] return k, signal
[ドキュメント] def estimate_dk(k, rtol=1e-3): """ k軸からk間隔(dk)を推定します。 詳細説明: k軸の隣接する値の差分の平均値からk間隔を推定します。 k軸の均一性を相対標準偏差で評価し、均一でない場合は警告を表示します。 FFT/MEMは均一にサンプリングされたkデータを仮定するため、不均一な場合は注意が必要です。 :param k: k軸データ。 :type k: numpy.ndarray :param rtol: k間隔の差分の相対標準偏差の許容範囲。この値を超えると警告が表示されます。デフォルトは1e-3。 :type rtol: float :returns: 推定されたk間隔と、k間隔の差分の相対標準偏差。 :rtype: tuple[float, float] :raises ValueError: dkが0の場合に発生します。 """ dk_arr = np.diff(k) dk = np.mean(dk_arr) if dk == 0: raise ValueError("dk is zero.") rel_std = np.std(dk_arr) / abs(dk) if rel_std > rtol: print("WARNING: k-axis is not perfectly uniform.") print(f" mean dk = {dk:.10g}") print(f" std(diff) = {np.std(dk_arr):.10g}") print(f" relative std = {rel_std:.3e}") print(" FFT/MEM assume uniformly sampled k data.") print(" Consider interpolation onto a uniform k grid if needed.") return dk, rel_std
# ============================================================ # Window and preprocessing # ============================================================
[ドキュメント] def make_hanning_range_window(k, hrange=None, hwidth=0.5): """ k軸にEXAFSスタイルのハニング窓を作成します。 詳細説明: 指定されたk範囲(`hrange`)と窓幅(`hwidth`)に基づいて、ハニング窓を生成します。 `hrange`が指定されない場合、すべてのkに対して1.0の窓関数が返されます。 `hwidth`が0の場合、矩形窓が生成されます。 `2.0 * hwidth`が`hrange`の幅よりも大きい場合、`hwidth`は自動調整されます。 :param k: k軸データ。 :type k: numpy.ndarray :param hrange: ハニング窓を適用するk範囲 [KSTART, KEND]。デフォルトはNone。 :type hrange: Optional[list[float]] :param hwidth: ハニング窓のテーパー幅 [A^-1]。デフォルトは0.5。 :type hwidth: float :returns: k軸と同じ長さのハニング窓配列。 :rtype: numpy.ndarray :raises ValueError: `hrange`に2つの異なる値が必要な場合、または`hwidth`が0未満の場合に発生します。 """ k = np.asarray(k, dtype=float) if hrange is None: return np.ones_like(k, dtype=float) k1 = float(hrange[0]) k2 = float(hrange[1]) if k2 < k1: k1, k2 = k2, k1 if k2 <= k1: raise ValueError("--hrange requires two different values: KSTART KEND") if hwidth < 0: raise ValueError("--hwidth must be >= 0") if 2.0 * hwidth > (k2 - k1): new_hwidth = 0.5 * (k2 - k1) print("WARNING: 2*hwidth is larger than hrange width.") print(f" hwidth changed from {hwidth:.10g} to {new_hwidth:.10g}") hwidth = new_hwidth win = np.zeros_like(k, dtype=float) if hwidth == 0: win[(k >= k1) & (k <= k2)] = 1.0 return win left = (k >= k1) & (k < k1 + hwidth) flat = (k >= k1 + hwidth) & (k <= k2 - hwidth) right = (k > k2 - hwidth) & (k <= k2) win[left] = 0.5 * (1.0 - np.cos(np.pi * (k[left] - k1) / hwidth)) win[flat] = 1.0 win[right] = 0.5 * (1.0 + np.cos(np.pi * (k[right] - (k2 - hwidth)) / hwidth)) return win
[ドキュメント] def preprocess_signal(k, signal_raw, korder=0.0, hrange=None, hwidth=0.5, remove_mean=False): """ k重み付け、オプションの平均減算、ハニング窓を適用して信号を前処理します。 詳細説明: 入力された生の信号に、`korder`に基づくk重み付けを適用します。 例えば、入力がchi(k)の場合、`korder=2`を指定するとk^2 chi(k)になります。 その後、`remove_mean`がTrueの場合、k重み付けされた信号から平均値が減算されます。 最後に、`make_hanning_range_window`で作成されたハニング窓が適用され、 変換に使用される最終的な信号が生成されます。 :param k: k軸データ。 :type k: numpy.ndarray :param signal_raw: 生の信号データ。 :type signal_raw: numpy.ndarray :param korder: 信号に適用するk重み付けの次数。デフォルトは0.0。 :type korder: float :param hrange: ハニング窓を適用するk範囲 [KSTART, KEND]。デフォルトはNone。 :type hrange: Optional[list[float]] :param hwidth: ハニング窓のテーパー幅 [A^-1]。デフォルトは0.5。 :type hwidth: float :param remove_mean: k重み付け後に信号の平均値を減算するかどうか。デフォルトはFalse。 :type remove_mean: bool :returns: 前処理された信号、k重み付けされた信号、平均値が減算された信号、および適用されたハニング窓。 :rtype: tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray] """ k = np.asarray(k, dtype=float) signal_raw = np.asarray(signal_raw, dtype=float) kweight = np.power(k, korder) signal_kweighted = signal_raw * kweight if remove_mean: signal_centered = signal_kweighted - np.mean(signal_kweighted) else: signal_centered = signal_kweighted.copy() window = make_hanning_range_window(k, hrange=hrange, hwidth=hwidth) signal_used = signal_centered * window return signal_used, signal_kweighted, signal_centered, window
# ============================================================ # AR order selection # ============================================================
[ドキュメント] def select_ar_order(signal, preset_order=8, max_order=40, order_method="bic"): """ AIC/BICまたは事前設定値に基づいてARモデルの次数を選択します。 詳細説明: 自己回帰 (AR) モデルの次数を選択するために、部分自己相関関数 (PACF)、 赤池情報量基準 (AIC)、ベイズ情報量基準 (BIC) を使用します。 PACFは診断情報として表示されますが、主要な次数選択にはAIC/BICが使われます。 `order_method`に基づいて、最適な次数が選択されます。 :param signal: ARモデルを適用する信号データ。 :type signal: numpy.ndarray :param preset_order: `order_method`が"preset"の場合に適用される、事前設定のAR次数。デフォルトは8。 :type preset_order: int :param max_order: AIC/BICでテストするARモデルの最大次数。デフォルトは40。 :type max_order: int :param order_method: AR次数選択方法。"bic", "aic", "preset"のいずれか。デフォルトは"bic"。 :type order_method: str :returns: 選択されたAR次数と、AIC/BICの情報を含む辞書。 :rtype: tuple[int, dict] :raises ValueError: `order_method`がサポートされていない値の場合に発生します。 """ n = len(signal) max_lag = min(max_order, n // 2) try: pacf_vals, confint = pacf(signal, nlags=max_lag, alpha=0.05) print(f"PACF with 95% CI for lags 1..{max_lag}:") significant_lags = [] for lag in range(1, max_lag + 1): lower, upper = confint[lag] val = pacf_vals[lag] sig = (lower > 0.0) or (upper < 0.0) marker = "*" if sig else " " print( f"lag={lag:2d}: pacf={val: .4f} " f"CI=({lower: .4f},{upper: .4f}) {marker}" ) if sig: significant_lags.append(lag) if significant_lags: print("PACF significant lags, diagnostic only: " f"{significant_lags[:10]}") else: print("No PACF spike outside zero-confidence interval.") print("PACF is not used as the primary order selector.\n") except Exception as e: print(f"PACF calculation failed: {e}") print("Continue with AIC/BIC order selection.\n") best_aic = np.inf best_bic = np.inf best_p_aic = None best_p_bic = None max_ar = min(max_order, n - 1) for p in range(1, max_ar + 1): try: model = AutoReg(signal, lags=p, old_names=False).fit() if model.aic < best_aic: best_aic = model.aic best_p_aic = p if model.bic < best_bic: best_bic = model.bic best_p_bic = p except Exception: pass print(f"Selected order by AIC: {best_p_aic}, AIC={best_aic:.6g}") print(f"Selected order by BIC: {best_p_bic}, BIC={best_bic:.6g}\n") if order_method == "bic": if best_p_bic is not None: order_selected = best_p_bic elif best_p_aic is not None: order_selected = best_p_aic else: order_selected = preset_order elif order_method == "aic": if best_p_aic is not None: order_selected = best_p_aic elif best_p_bic is not None: order_selected = best_p_bic else: order_selected = preset_order elif order_method == "preset": order_selected = preset_order else: raise ValueError("order_method must be one of: bic, aic, preset") print(f"Order selection method = {order_method}") print(f"Using AR model order = {order_selected}\n") return order_selected, { "order_method": order_method, "best_p_aic": best_p_aic, "best_aic": best_aic, "best_p_bic": best_p_bic, "best_bic": best_bic, }
# ============================================================ # FFT / MEM # ============================================================
[ドキュメント] def calc_fft_exafs(signal_used, dk, nfft): """ EXAFSスタイルのFFTを計算します。 詳細説明: FFTは `np.fft.rfft` を使用して計算されます。 EXAFSでは `chi(k) ~ sin(2 k R + phi)` の関係があるため、R軸は `R = pi * frequency` と定義されます。 積分スケールを模倣するために、FTの結果は `dk` を乗算されます。 結果として、複素FFT、実数部、虚数部、振幅、パワーが返されます。 :param signal_used: 前処理された信号データ。 :type signal_used: numpy.ndarray :param dk: k軸の平均k間隔。 :type dk: float :param nfft: FFTのNFFT長(ゼロパディングを含む)。 :type nfft: int :returns: EXAFS R軸、複素FFT、FFTの実数部、FFTの虚数部、FFTの振幅、FFTのパワー。 :rtype: tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray] """ ft_complex = np.fft.rfft(signal_used, n=nfft) * dk freq = np.fft.rfftfreq(nfft, d=dk) R = np.pi * freq ft_real = ft_complex.real ft_imag = ft_complex.imag ft_amp = np.abs(ft_complex) ft_power = ft_amp ** 2 return R, ft_complex, ft_real, ft_imag, ft_amp, ft_power
[ドキュメント] def calc_mem_exafs(signal_used, dk, order, nfft): """ `spectrum.pburg` を使用してMEMスペクトルを計算します。 詳細説明: Burg法(`spectrum.pburg`)を用いて最大エントロピー法 (MEM) スペクトルを計算します。 `pburg.frequencies()` は `cycles/sample` の単位で周波数を返すため、 EXAFSのR軸 (`R = pi * frequency / dk`) に変換されます。 :param signal_used: 前処理された信号データ。 :type signal_used: numpy.ndarray :param dk: k軸の平均k間隔。 :type dk: float :param order: ARモデルの次数。 :type order: int :param nfft: FFTのNFFT長(ゼロパディングを含む)。 :type nfft: int :returns: EXAFS R軸、サンプルあたりの周波数、MEMスペクトルのパワースペクトル密度 (PSD)。 :rtype: tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray] """ burg_spec = pburg(signal_used, order=order, NFFT=nfft) mem_psd = np.asarray(burg_spec.psd, dtype=float) freqs_sample = np.asarray(burg_spec.frequencies(), dtype=float) # pburg.frequencies() is cycles/sample. Convert to R for EXAFS. R = np.pi * freqs_sample / dk return R, freqs_sample, mem_psd
# ============================================================ # Utilities # ============================================================
[ドキュメント] def find_main_peak(R, y, rmin=1e-12): """ R=0を除外してメインピークを見つけます。 詳細説明: 指定されたR軸とyデータから、Rが`rmin`よりも大きい範囲でyの最大値を見つけ、 そのピークのR値とy値を返します。 :param R: R軸データ。 :type R: numpy.ndarray :param y: 変換された信号(振幅、PSDなど)。 :type y: numpy.ndarray :param rmin: ピーク検索から除外するR軸の最小値。デフォルトは1e-12。 :type rmin: float :returns: メインピークのR値とy値。ピークが見つからない場合はNaN。 :rtype: tuple[float, float] """ R = np.asarray(R) y = np.asarray(y) mask = np.isfinite(R) & np.isfinite(y) & (R > rmin) if not np.any(mask): return np.nan, np.nan R2 = R[mask] y2 = y[mask] idx = np.argmax(y2) return R2[idx], y2[idx]
[ドキュメント] def normalize_curve(y, method="none"): """ プロットや比較のために1つの曲線を正規化します。 詳細説明: `method`が"none"の場合、入力`y`は変更されずに返されます。 `method`が"max"の場合、`y`の有限値の絶対値の最大値で`y`全体を割って正規化します。 これは、MEMスペクトルのスケールがAR次数によって大きく変化する場合の比較に特に有用です。 :param y: 正規化する曲線データ。 :type y: numpy.ndarray :param method: 正規化方法。"none"または"max"。デフォルトは"none"。 :type method: str :returns: 正規化された曲線データ。 :rtype: numpy.ndarray :raises ValueError: `method`が"none"または"max"以外の値の場合に発生します。 """ y = np.asarray(y, dtype=float) if method == "none": return y.copy() if method != "max": raise ValueError("normalize must be 'none' or 'max'") out = y.copy() mask = np.isfinite(out) if not np.any(mask): return out ymax = np.nanmax(np.abs(out[mask])) if ymax > 0: out = out / ymax return out
[ドキュメント] def normalized_label(label, normalize): """ プロットラベルに正規化タグを追加します。 詳細説明: 正規化方法が"max"の場合、元のラベルに `/ max` を追加して返します。 それ以外の正規化方法の場合は、元のラベルをそのまま返します。 :param label: 元のプロットラベル。 :type label: str :param normalize: 正規化方法。 :type normalize: str :returns: 正規化タグが追加された(または変更なしの)ラベル。 :rtype: str """ if normalize == "max": return f"{label} / max" return label
[ドキュメント] def make_output_filename(infile, mode, order, korder, scan_param=None): """ 出力Excelファイル名を生成します。 詳細説明: 入力ファイル名、変換モード、MEM次数、k重み付け次数、およびオプションのスキャンパラメータに基づいて、 自動的に出力ファイル名を生成します。 :param infile: 入力ファイルへのパス。 :type infile: str :param mode: 変換モード ("fft", "mem", "both")。 :type mode: str :param order: MEMのAR次数。 :type order: int :param korder: k重み付けの次数。 :type korder: float :param scan_param: アクティブなスキャンパラメータの名前。デフォルトはNone。 :type scan_param: Optional[str] :returns: 生成された出力ファイル名。 :rtype: str """ stem, _ = os.path.splitext(infile) scan_tag = "" if scan_param is None else f"_scan_{scan_param}" if mode == "mem": return f"{stem}_exafs_mem_order={order}_korder={korder:g}{scan_tag}.xlsx" elif mode == "fft": return f"{stem}_exafs_fft_korder={korder:g}{scan_tag}.xlsx" else: return f"{stem}_exafs_fft_mem_order={order}_korder={korder:g}{scan_tag}.xlsx"
[ドキュメント] def scan_values(start, step, n, integer=False): """ スキャンパラメータの値を生成します。 詳細説明: 開始値 (`start`)、ステップサイズ (`step`)、およびステップ数 (`n`) に基づいて、 等間隔の数値のリストを生成します。`integer`がTrueの場合、値は整数に丸められます。 :param start: スキャン開始値。 :type start: float :param step: 各ステップでの増分値。 :type step: float :param n: 生成する値の数。 :type n: int :param integer: 生成された値を整数に丸めるかどうか。デフォルトはFalse。 :type integer: bool :returns: 生成されたスキャン値のリスト。 :rtype: list[float] or list[int] """ vals = [start + step * i for i in range(int(n))] if integer: vals = [int(round(v)) for v in vals] return vals
[ドキュメント] def get_active_scan(args): """ アクティブなスキャンパラメータとその値を取得します。 詳細説明: 同時に指定できるスキャンパラメータは1つだけです。 複数のスキャンパラメータが指定された場合、エラーを発生させます。 選択されたスキャンパラメータとそれに対応する値のリストを返します。 :param args: コマンドライン引数を格納する名前空間オブジェクト。 :type args: argparse.Namespace :returns: アクティブなスキャンパラメータの名前と、それに対応する値のリスト。 アクティブなスキャンがない場合は (None, None) を返します。 :rtype: tuple[Optional[str], Optional[list[Union[int, float]]]] :raises ValueError: 複数のスキャンパラメータが指定された場合、または有効なスキャン値が生成されない場合に発生します。 """ specs = { "order": args.scan_order, "hmin": args.scan_hmin, "hmax": args.scan_hmax, "hwidth": args.scan_hwidth, "korder": args.scan_korder, "npadding": args.scan_npadding, } active = [(name, spec) for name, spec in specs.items() if spec is not None] if len(active) == 0: return None, None if len(active) > 1: names = ", ".join(name for name, _ in active) raise ValueError( "Only one scan parameter can be specified at a time. " f"Active scans: {names}" ) name, spec = active[0] start, step, n = spec integer = name in ["order", "npadding"] vals = scan_values(start, step, int(n), integer=integer) if name == "order": vals = [v for v in vals if v > 0] elif name == "npadding": vals = [v for v in vals if v > 0] if len(vals) == 0: raise ValueError(f"No valid scan values for {name}.") return name, vals
[ドキュメント] def apply_scan_value(args, scan_param, value, k): """ 渡されたスキャンパラメータの値を `args` 名前空間に適用し、そのコピーを返します。 詳細説明: 元の `args` オブジェクトを変更しないように、まずコピーを作成します。 次に、`scan_param`に応じて対応する`args`属性に`value`を適用します。 `hrange`が設定されていない場合は、k軸の全範囲で初期化されます。 :param args: コマンドライン引数を格納する元の名前空間オブジェクト。 :type args: argparse.Namespace :param scan_param: 変更するスキャンパラメータの名前。 :type scan_param: str :param value: 適用するスキャンパラメータの値。 :type value: Union[int, float] :param k: k軸データ。`hrange`の初期化に使用される場合があります。 :type k: numpy.ndarray :returns: 指定されたスキャン値が適用された `args` 名前空間のコピー。 :rtype: argparse.Namespace :raises ValueError: 未知のスキャンパラメータが指定された場合に発生します。 """ a = copy.copy(args) if scan_param == "order": a.preset_order = int(value) a.order_method = "preset" elif scan_param == "hmin": if a.hrange is None: a.hrange = [float(np.min(k)), float(np.max(k))] a.hrange = [float(value), float(a.hrange[1])] elif scan_param == "hmax": if a.hrange is None: a.hrange = [float(np.min(k)), float(np.max(k))] a.hrange = [float(a.hrange[0]), float(value)] elif scan_param == "hwidth": a.hwidth = float(value) elif scan_param == "korder": a.korder = float(value) elif scan_param == "npadding": a.npadding = int(value) else: raise ValueError(f"Unknown scan parameter: {scan_param}") return a
[ドキュメント] def y_from_fft_result(fft_result, fft_plot): """ FFT結果からプロットに適したR軸とyデータを抽出します。 詳細説明: `fft_plot`引数の値に基づいて、FFTの振幅、パワー、または実数部を返します。 :param fft_result: `calc_fft_exafs`関数から返されたFFT結果のタプル。 :type fft_result: tuple :param fft_plot: プロットするFFTの種類。"amplitude", "power", "realimag"のいずれか。 :type fft_plot: str :returns: R軸、プロットするyデータ、yデータの名前。 :rtype: tuple[numpy.ndarray, numpy.ndarray, str] """ R, ft_complex, ft_real, ft_imag, ft_amp, ft_power = fft_result if fft_plot == "power": return R, ft_power, "FFT power" if fft_plot == "realimag": return R, ft_real, "FFT real" return R, ft_amp, "FFT amplitude"
[ドキュメント] def y_from_mem_result(mem_result): """ MEM結果からプロットに適したR軸とyデータを抽出します。 詳細説明: `calc_mem_exafs`関数から返されたMEM結果から、R軸とMEMのパワースペクトル密度 (PSD) を抽出します。 :param mem_result: `calc_mem_exafs`関数から返されたMEM結果のタプル。 :type mem_result: tuple :returns: R軸、プロットするMEM PSDデータ、yデータの名前。 :rtype: tuple[numpy.ndarray, numpy.ndarray, str] """ R, freqs_mem_sample, mem_psd = mem_result return R, mem_psd, "MEM PSD"
# ============================================================ # Single calculation # ============================================================
[ドキュメント] def run_transform_for_args(args, k, signal_raw, dk, quiet=False): """ 指定された引数に基づいて、前処理と選択された変換を実行します。 詳細説明: `preprocess_signal`関数を使用して入力信号を前処理します。 次に、`args.mode`に応じてFFT、MEM、または両方の変換を実行します。 MEM変換の場合、`select_ar_order`関数によりARモデルの次数が決定されます。 結果の概要は、`quiet`がFalseの場合にコンソールに出力されます。 :param args: コマンドライン引数を格納する名前空間オブジェクト。 :type args: argparse.Namespace :param k: k軸データ。 :type k: numpy.ndarray :param signal_raw: 生の信号データ。 :type signal_raw: numpy.ndarray :param dk: k軸の平均k間隔。 :type dk: float :param quiet: 結果の概要をコンソールに出力するかどうか。Trueの場合、出力は抑制されます。デフォルトはFalse。 :type quiet: bool :returns: 変換結果を含む辞書。前処理された信号、FFT/MEM結果、AR次数選択情報などが含まれます。 :rtype: dict """ signal_used, signal_kweighted, signal_centered, window = preprocess_signal( k, signal_raw, korder=args.korder, hrange=args.hrange, hwidth=args.hwidth, remove_mean=args.remove_mean, ) n_data = len(k) nfft = max(int(args.npadding), n_data) fft_result = None mem_result = None order_selected = None order_info = {} if args.mode in ["fft", "both"]: fft_result = calc_fft_exafs(signal_used, dk, nfft) if args.mode in ["mem", "both"]: order_selected, order_info = select_ar_order( signal_used, preset_order=args.preset_order, max_order=args.max_order, order_method=args.order_method, ) mem_result = calc_mem_exafs(signal_used, dk, order_selected, nfft) if not quiet: print("Transform result") print(f" nfft used = {nfft}") print(f" korder = {args.korder:g}") print(f" hrange = {args.hrange}") print(f" hwidth = {args.hwidth:g}") if mem_result is not None: R_mem, _, mem_psd = mem_result peak_R_mem, peak_y_mem = find_main_peak(R_mem, mem_psd) print(f" MEM order = {order_selected}") print(f" MEM peak = R={peak_R_mem:.10g} A, PSD={peak_y_mem:.10g}") if fft_result is not None: R_fft, _, _, _, ft_amp, _ = fft_result peak_R_fft, peak_y_fft = find_main_peak(R_fft, ft_amp) print(f" FFT peak = R={peak_R_fft:.10g} A, amp={peak_y_fft:.10g}") print() return { "args": args, "signal_used": signal_used, "signal_kweighted": signal_kweighted, "signal_centered": signal_centered, "window": window, "nfft": nfft, "fft_result": fft_result, "mem_result": mem_result, "order_selected": order_selected, "order_info": order_info, }
# ============================================================ # Save Excel # ============================================================
[ドキュメント] def save_results_excel( outfile, args, k, signal_raw, dk, rel_std_dk, result, scan_results=None, scan_param=None, ): """ 入力データ、FFT/MEM結果、およびスキャン結果をExcelファイルに保存します。 詳細説明: 結果は複数のシートに分割されて保存されます。 - `Input` シート: 生の信号、前処理された信号、窓関数など。 - `FFT` シート (FFTモードの場合): R軸、FFTの実数部、虚数部、振幅、パワー。正規化されたデータも含まれます。 - `MEM` シート (MEMモードの場合): R軸、MEMのPSD。正規化されたデータも含まれます。 - `ScanSummary` シート (スキャンモードの場合): 各スキャンパラメータ値でのピーク位置やパラメータの詳細。 - `ScanCurves` シート (スキャンモードの場合): 各スキャンにおける変換曲線のデータ。 - `Summary` シート: 変換に使用された主要なパラメータと情報。 :param outfile: 結果を保存するExcelファイルへのパス。 :type outfile: str :param args: コマンドライン引数を格納する名前空間オブジェクト。 :type args: argparse.Namespace :param k: k軸データ。 :type k: numpy.ndarray :param signal_raw: 生の信号データ。 :type signal_raw: numpy.ndarray :param dk: k軸の平均k間隔。 :type dk: float :param rel_std_dk: k間隔の差分の相対標準偏差。 :type rel_std_dk: float :param result: 単一の変換結果を含む辞書。`run_transform_for_args`の戻り値。 :type result: dict :param scan_results: スキャンモードの場合の各スキャン結果のリスト。デフォルトはNone。 :type scan_results: Optional[list[dict]] :param scan_param: アクティブなスキャンパラメータの名前。デフォルトはNone。 :type scan_param: Optional[str] :returns: None """ signal_kweighted = result["signal_kweighted"] signal_centered = result["signal_centered"] window = result["window"] signal_used = result["signal_used"] fft_result = result["fft_result"] mem_result = result["mem_result"] order_selected = result["order_selected"] order_info = result["order_info"] df_input = pd.DataFrame({ "k_A^-1": k, "signal_raw": signal_raw, f"signal_k^{args.korder:g}_weighted": signal_kweighted, "signal_centered": signal_centered, "hanning_window": window, "signal_used": signal_used, }) summary_items = [ "input_file", "sheet", "xcol", "ycol", "mode", "n_data", "npadding", "nfft_used", "dk [A^-1]", "relative_std_dk", "korder", "remove_mean", "hrange", "hwidth [A^-1]", "AR_order_selected", "order_method", "best_p_aic", "best_aic", "best_p_bic", "best_bic", "scan_parameter", "normalize", "R_axis_definition", "FFT_definition", ] summary_values = [ args.infile, args.sheet, args.xcol, args.ycol, args.mode, len(k), args.npadding, result["nfft"], dk, rel_std_dk, args.korder, args.remove_mean, "" if args.hrange is None else str(args.hrange), args.hwidth, order_selected if order_selected is not None else "", order_info.get("order_method", "") if order_info else "", order_info.get("best_p_aic", "") if order_info else "", order_info.get("best_aic", "") if order_info else "", order_info.get("best_p_bic", "") if order_info else "", order_info.get("best_bic", "") if order_info else "", "" if scan_param is None else scan_param, args.normalize, "EXAFS: R = pi * frequency, because chi(k) ~ sin(2 k R + phase)", "FT = rFFT(signal_used, nfft) * dk", ] with pd.ExcelWriter(outfile, engine="openpyxl") as writer: df_input.to_excel(writer, sheet_name="Input", index=False) if fft_result is not None: R_fft, ft_complex, ft_real, ft_imag, ft_amp, ft_power = fft_result peak_R_fft, peak_y_fft = find_main_peak(R_fft, ft_amp) df_fft = pd.DataFrame({ "R_A": R_fft, "FT_real": ft_real, "FT_imag": ft_imag, "FT_amplitude": ft_amp, "FT_power": ft_power, "FT_real_norm": normalize_curve(ft_real, args.normalize), "FT_imag_norm": normalize_curve(ft_imag, args.normalize), "FT_amplitude_norm": normalize_curve(ft_amp, args.normalize), "FT_power_norm": normalize_curve(ft_power, args.normalize), }) df_fft.to_excel(writer, sheet_name="FFT", index=False) summary_items += ["main_peak_R_FFT_amplitude [A]", "main_peak_FFT_amplitude"] summary_values += [peak_R_fft, peak_y_fft] if mem_result is not None: R_mem, freqs_mem_sample, mem_psd = mem_result peak_R_mem, peak_y_mem = find_main_peak(R_mem, mem_psd) df_mem = pd.DataFrame({ "Frequency_MEM_cycles_per_sample": freqs_mem_sample, "R_A": R_mem, "MEM_PSD": mem_psd, "MEM_PSD_norm": normalize_curve(mem_psd, args.normalize), }) df_mem.to_excel(writer, sheet_name="MEM", index=False) summary_items += ["main_peak_R_MEM [A]", "main_peak_MEM_PSD"] summary_values += [peak_R_mem, peak_y_mem] if scan_results: rows = [] scan_curve_columns = {} first_R = None for sr in scan_results: label = sr["label"] scan_value = sr["scan_value"] rargs = sr["args"] if args.mode == "fft": R, y, yname = y_from_fft_result(sr["fft_result"], args.fft_plot) elif args.mode == "mem": R, y, yname = y_from_mem_result(sr["mem_result"]) else: # In both mode, scan table uses FFT amplitude as primary if present. R, y, yname = y_from_fft_result(sr["fft_result"], args.fft_plot) y_norm = normalize_curve(y, args.normalize) peak_R, peak_y = find_main_peak(R, y) peak_R_norm, peak_y_norm = find_main_peak(R, y_norm) rows.append({ "scan_parameter": scan_param, "scan_value": scan_value, "label": label, "mode": args.mode, "y_name": yname, "order": sr["order_selected"], "npadding": rargs.npadding, "nfft": sr["nfft"], "korder": rargs.korder, "hrange": str(rargs.hrange), "hwidth": rargs.hwidth, "peak_R_A": peak_R, "peak_y": peak_y, "normalize": args.normalize, "peak_R_A_norm": peak_R_norm, "peak_y_norm": peak_y_norm, }) if first_R is None: first_R = R scan_curve_columns["R_A"] = R if len(R) == len(first_R) and np.allclose(R, first_R, rtol=0, atol=1e-12): scan_curve_columns[label] = y_norm if args.normalize != "none": scan_curve_columns[f"raw_{label}"] = y else: # Different npadding gives different R mesh. # Save R/y pair as separate columns. scan_curve_columns[f"R_A_{label}"] = R scan_curve_columns[f"Y_{label}"] = y_norm if args.normalize != "none": scan_curve_columns[f"Y_raw_{label}"] = y pd.DataFrame(rows).to_excel(writer, sheet_name="ScanSummary", index=False) pd.DataFrame(dict([(k2, pd.Series(v2)) for k2, v2 in scan_curve_columns.items()])).to_excel( writer, sheet_name="ScanCurves", index=False ) df_summary = pd.DataFrame({"item": summary_items, "value": summary_values}) df_summary.to_excel(writer, sheet_name="Summary", index=False)
# ============================================================ # Plot # ============================================================
[ドキュメント] def plot_results(args, k, signal_raw, result, scan_results=None, scan_param=None): """ 入力信号と窓関数、変換結果をプロットします。 スキャン結果が与えられた場合は、その曲線も重ねて表示します。 詳細説明: matplotlibを使用して2つのサブプロットを生成します。 上段のサブプロットは、生の信号、前処理された信号、およびハニング窓関数を表示します。 下段のサブプロットは、FFTまたはMEMの変換結果を表示します。 `scan_results`が提供されている場合、各スキャンにおける変換曲線がオーバーレイされます。 正規化オプション、R軸の範囲制限、対数スケールなどのプロットオプションがサポートされます。 `--savefig`が指定されている場合、プロットは画像ファイルとして保存されます。 :param args: コマンドライン引数を格納する名前空間オブジェクト。 :type args: argparse.Namespace :param k: k軸データ。 :type k: numpy.ndarray :param signal_raw: 生の信号データ。 :type signal_raw: numpy.ndarray :param result: 単一の変換結果を含む辞書。`run_transform_for_args`の戻り値。 :type result: dict :param scan_results: スキャンモードの場合の各スキャン結果のリスト。デフォルトはNone。 :type scan_results: Optional[list[dict]] :param scan_param: アクティブなスキャンパラメータの名前。デフォルトはNone。 :type scan_param: Optional[str] :returns: None """ fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(9, 8)) ax0 = axes[0] ax0.plot(k, signal_raw, label="raw signal", linewidth=1.2) ax0.plot(k, result["signal_used"], label="used signal", linewidth=1.2) ax0b = ax0.twinx() ax0b.plot(k, result["window"], linestyle="--", linewidth=1.0, label="Hanning window") ax0b.set_ylabel("window") ax0.set_xlabel("k [A$^{-1}$]") ax0.set_ylabel("signal") ax0.set_title("Input and processed signal") ax0.grid(True) lines0, labels0 = ax0.get_legend_handles_labels() lines1, labels1 = ax0b.get_legend_handles_labels() ax0.legend(lines0 + lines1, labels0 + labels1, loc="best") ax1 = axes[1] if scan_results: for sr in scan_results: label = sr["label"] if args.mode == "fft": R, y, _ = y_from_fft_result(sr["fft_result"], args.fft_plot) y_plot = normalize_curve(y, args.normalize) ax1.plot(R, y_plot, linewidth=1.2, label=label) elif args.mode == "mem": R, y, _ = y_from_mem_result(sr["mem_result"]) y_plot = normalize_curve(y, args.normalize) ax1.plot(R, y_plot, linewidth=1.2, label=label) else: R, y, _ = y_from_fft_result(sr["fft_result"], args.fft_plot) y_plot = normalize_curve(y, args.normalize) ax1.plot(R, y_plot, linewidth=1.0, label=f"FFT {label}") Rm, ym, _ = y_from_mem_result(sr["mem_result"]) ym_plot = normalize_curve(ym, args.normalize) ax1.plot(Rm, ym_plot, linestyle="--", linewidth=1.0, label=f"MEM {label}") ax1.set_title(f"EXAFS transform scan: {scan_param}") else: fft_result = result["fft_result"] mem_result = result["mem_result"] order_selected = result["order_selected"] if fft_result is not None: R_fft, ft_complex, ft_real, ft_imag, ft_amp, ft_power = fft_result if args.fft_plot == "realimag": ax1.plot(R_fft, normalize_curve(ft_real, args.normalize), label=normalized_label("FFT real", args.normalize), linewidth=1.2) ax1.plot(R_fft, normalize_curve(ft_imag, args.normalize), label=normalized_label("FFT imag", args.normalize), linewidth=1.2) y_for_peak = normalize_curve(ft_amp, args.normalize) elif args.fft_plot == "power": y_plot = normalize_curve(ft_power, args.normalize) ax1.plot(R_fft, y_plot, label=normalized_label("FFT power", args.normalize), linewidth=1.2) y_for_peak = y_plot else: y_plot = normalize_curve(ft_amp, args.normalize) ax1.plot(R_fft, y_plot, label=normalized_label("FFT amplitude", args.normalize), linewidth=1.2) y_for_peak = y_plot peak_R_fft, peak_y_fft = find_main_peak(R_fft, y_for_peak) if np.isfinite(peak_R_fft): ax1.axvline(peak_R_fft, linestyle="--", linewidth=0.8) if mem_result is not None: R_mem, freqs_mem_sample, mem_psd = mem_result mem_psd_plot = normalize_curve(mem_psd, args.normalize) ax1.plot(R_mem, mem_psd_plot, label=normalized_label(f"MEM PSD, order={order_selected}", args.normalize), linewidth=1.8) peak_R_mem, peak_y_mem = find_main_peak(R_mem, mem_psd_plot) if np.isfinite(peak_R_mem): ax1.axvline(peak_R_mem, linestyle=":", linewidth=0.8) ax1.set_title("EXAFS transform") ax1.set_xlabel("R [A]") ax1.set_ylabel("FT / PSD" if args.normalize == "none" else "Normalized FT / PSD") ax1.grid(True) ax1.legend() if args.yscale == "log": ax1.set_yscale("log") if args.xlim is not None: ax1.set_xlim(args.xlim[0], args.xlim[1]) plt.tight_layout() if args.savefig: plt.savefig(args.savefig, dpi=300) print(f"Figure saved to {args.savefig}") if not args.no_show: plt.show()
# ============================================================ # Main # ============================================================
[ドキュメント] def main(): """ スクリプトのメイン実行関数です。 詳細説明: コマンドライン引数を解析し、データの読み込み、k軸間隔の推定、 FFT/MEM変換の実行、パラメータスキャン処理、結果のExcelファイルへの保存、 およびMatplotlibによる結果のプロットを行います。 エラーが発生した場合は、適切なメッセージを出力し、プログラムを終了します。 :returns: なし :rtype: None """ parser = argparse.ArgumentParser( description=( "EXAFS-oriented FFT/MEM transform. " "Input column 0 is k [A^-1], column 1 is chi(k)-like signal." ) ) parser.add_argument("infile", help="Input file: .xlsx, .csv, .txt, or .dat") parser.add_argument( "preset_order", nargs="?", type=int, default=8, help="Fallback/fixed AR model order for MEM. Default: 8.", ) parser.add_argument("--mode", choices=["fft", "mem", "both"], default="fft") parser.add_argument("--sheet", default=0, help="Excel sheet name or index. Default: 0.") parser.add_argument("--xcol", type=int, default=0, help="k-axis column index, 0-based.") parser.add_argument("--ycol", type=int, default=1, help="signal column index, 0-based.") parser.add_argument( "--npadding", type=int, default=1024, help="FFT/MEM NFFT length. If smaller than data length, data length is used.", ) parser.add_argument( "--korder", type=float, default=0.0, help="Apply k^korder to input signal before transform. If input is already k^2 chi(k), use 0.", ) parser.add_argument( "--hrange", type=float, nargs=2, default=None, metavar=("KSTART", "KEND"), help="Hanning window k range [A^-1], e.g. --hrange 2 12.", ) parser.add_argument("--hwidth", type=float, default=0.5, help="Hanning taper width [A^-1].") parser.add_argument("--remove-mean", action="store_true", help="Subtract mean after k-weighting and before windowing.") parser.add_argument("--max-order", type=int, default=40, help="Maximum AR order tested by AIC/BIC.") parser.add_argument( "--order-method", choices=["bic", "aic", "preset"], default="preset", help="AR order selection method for MEM. Default: preset.", ) # Direct fixed-order option, more readable than positional preset_order. parser.add_argument( "--order", type=int, default=None, help="Fixed AR order for MEM. Overrides positional preset_order and --order-method.", ) # Scan parameters: START STEP N parser.add_argument("--scan_order", type=float, nargs=3, default=None, metavar=("START", "STEP", "N"), help="Scan MEM order. Example: --scan_order 10 10 5 gives 10,20,30,40,50.") parser.add_argument("--scan_hmin", type=float, nargs=3, default=None, metavar=("START", "STEP", "N"), help="Scan lower bound of --hrange.") parser.add_argument("--scan_hmax", type=float, nargs=3, default=None, metavar=("START", "STEP", "N"), help="Scan upper bound of --hrange.") parser.add_argument("--scan_hwidth", type=float, nargs=3, default=None, metavar=("START", "STEP", "N"), help="Scan Hanning taper width.") parser.add_argument("--scan_korder", type=float, nargs=3, default=None, metavar=("START", "STEP", "N"), help="Scan k-weight order.") parser.add_argument("--scan_npadding", type=float, nargs=3, default=None, metavar=("START", "STEP", "N"), help="Scan zero-padding NFFT length.") parser.add_argument("--yscale", choices=["linear", "log"], default="linear") parser.add_argument("--fft-plot", choices=["amplitude", "power", "realimag"], default="amplitude") parser.add_argument( "--normalize", choices=["none", "max"], default="none", help="Normalize each plotted/saved comparison curve. 'max' divides each curve by max(abs(y)).", ) parser.add_argument("--xlim", type=float, nargs=2, default=None, help="R-axis plot range, e.g. --xlim 0 6.") parser.add_argument("--outfile", default="", help="Output Excel file. Default: auto-generated.") parser.add_argument("--savefig", default="", help="Save figure filename, e.g. result.png.") parser.add_argument("--no-show", action="store_true", help="Do not show matplotlib window.") args = parser.parse_args() if args.order is not None: args.preset_order = args.order args.order_method = "preset" try: args.sheet = int(args.sheet) except Exception: pass try: scan_param, scan_vals = get_active_scan(args) except Exception as e: print(f"Error in scan options: {e}") sys.exit(1) try: k, signal_raw = load_xy(args.infile, sheet=args.sheet, xcol=args.xcol, ycol=args.ycol) except Exception as e: print(f"Error loading input file: {e}") sys.exit(1) try: dk, rel_std_dk = estimate_dk(k) except Exception as e: print(f"Error estimating dk: {e}") sys.exit(1) print() print("Input data summary") print(f" file = {args.infile}") print(f" mode = {args.mode}") print(f" n_data = {len(k)}") print(f" k_min = {np.min(k):.10g} [A^-1]") print(f" k_max = {np.max(k):.10g} [A^-1]") print(f" dk = {dk:.10g} [A^-1]") print(f" rel std dk = {rel_std_dk:.3e}") print(f" scan_param = {scan_param}") print(f" scan_values = {scan_vals}") print(f" normalize = {args.normalize}") print() try: result = run_transform_for_args(args, k, signal_raw, dk, quiet=False) except Exception as e: print(f"Error calculating transform: {e}") sys.exit(1) scan_results = None if scan_param is not None: scan_results = [] print("Scan calculation") for value in scan_vals: try: run_args = apply_scan_value(args, scan_param, value, k) if scan_param == "order" and run_args.mode == "fft": print("WARNING: --scan_order has no effect in --mode fft. Use --mode mem or --mode both.") sr = run_transform_for_args(run_args, k, signal_raw, dk, quiet=True) sr["scan_value"] = value sr["label"] = f"{scan_param}={value:g}" scan_results.append(sr) if sr["mem_result"] is not None: Rm, _, ym = sr["mem_result"] pr, py = find_main_peak(Rm, ym) print(f" {sr['label']}: MEM peak R={pr:.6g} A, y={py:.6g}") if sr["fft_result"] is not None: Rf, _, _, _, ya, _ = sr["fft_result"] pr, py = find_main_peak(Rf, ya) print(f" {sr['label']}: FFT peak R={pr:.6g} A, y={py:.6g}") except Exception as e: print(f" {scan_param}={value}: ERROR: {e}") print() if args.outfile: outfile = args.outfile else: outfile = make_output_filename( args.infile, args.mode, result["order_selected"] if result["order_selected"] is not None else args.preset_order, args.korder, scan_param=scan_param, ) try: save_results_excel( outfile, args, k, signal_raw, dk, rel_std_dk, result, scan_results=scan_results, scan_param=scan_param, ) print(f"Results saved to {outfile}") except Exception as e: print(f"Error saving Excel file: {e}") plot_results(args, k, signal_raw, result, scan_results=scan_results, scan_param=scan_param) input("\nPress ENTER to terminate>>\n")
if __name__ == "__main__": main()