"""
peakfit.py - ピークフィッティングおよびピークサーチの機能を提供するモジュール。
このモジュールは、主にX線回折(XRD)データなどのスペクトルデータに対して、
ピークの検出、フィッティングパラメータの構築、およびモデル関数(ガウス関数、ローレンツ関数)
によるデータシミュレーションを行うための機能を含んでいます。
線形最小二乗法をサポートするための基底関数や、入出力データの処理、プロット機能も提供します。
関連リンク: :doc:`peakfit_usage` (もし利用可能なドキュメントがあれば)
"""
import os
import numpy as np
#from numpy import log, log10, sqrt, exp
import openpyxl
import pandas as pd
import matplotlib.pyplot as plt
#from functools import lru_cache
import peaksearch
infile = 'test/xrd.xlsx'
outfile = 'simulated.xlsx'
[ドキュメント]
def Gaussian(x, x0, whalf, A = None):
"""
ガウス関数を計算します。
詳細説明:
与えられたx値、ピーク中心、半値幅に基づきガウス関数 `A * exp(-((x - x0) / a)^2)` を計算します。
ここで `a = whalf / sqrt(ln2)` です。
`A` がNoneの場合、半値幅 `whalf` から、ピーク高さが1となるような係数 (0.469718639 / whalf) を自動計算します。
定数 `0.832554611` は `sqrt(ln2)` に相当します。`exp` は `numpy.exp` を想定しています。
:param x: float or numpy.ndarray: ガウス関数を評価するx値。
:param x0: float: ピークの中心位置。
:param whalf: float: ピークの半値幅。
:param A: float, optional: ピークの高さまたは強度に関連する係数。Noneの場合、自動計算されます。
:returns: float or numpy.ndarray: 計算されたガウス関数の値。
"""
#A = 1/whalf * sqrt(ln2 / pi)
if A is None:
A = 0.469718639 / whalf
#a = whalf / sqrt(ln2)
a = whalf / 0.832554611
X = (x - x0) / a
return A * np.exp(-X * X)
[ドキュメント]
def Lorentzian(x, x0, whalf, A = None):
"""
ローレンツ関数を計算します。
詳細説明:
与えられたx値、ピーク中心、半値幅に基づきローレンツ関数 `A / (1.0 + ((x - x0) / whalf)^2)` を計算します。
`A` がNoneの場合、半値幅 `whalf` から、ピーク高さが1となるような係数 (1.0 / (whalf * pi)) を自動計算します。
`pi` は円周率に相当します。`pi` は `numpy.pi` を想定しています。
:param x: float or numpy.ndarray: ローレンツ関数を評価するx値。
:param x0: float: ピークの中心位置。
:param whalf: float: ピークの半値幅。
:param A: float, optional: ピークの高さまたは強度に関連する係数。Noneの場合、自動計算されます。
:returns: float or numpy.ndarray: 計算されたローレンツ関数の値。
"""
#A = 1/whalf/pi
if A is None:
A = 1.0 / whalf / np.pi
X = (x - x0) / whalf
return A / (1.0 + X * X)
[ドキュメント]
def peak_func(x, I0, xc, w):
"""
単一のピーク関数(現在はローレンツ関数)の値を計算します。
詳細説明:
`I0` が `None` の場合、強度係数を含まないピーク形状(高さ1.0)を返します。
それ以外の場合、指定された強度 `I0` を乗算したピーク関数を返します。
現在の実装では `Lorentzian` 関数を使用しています。
:param x: float or numpy.ndarray: 関数を評価するx値。
:param I0: float or None: ピークの強度係数。Noneの場合、強度は1.0として扱われます。
:param xc: float: ピークの中心位置。
:param w: float: ピークの半値幅。
:returns: float or numpy.ndarray: 計算されたピーク関数の値。
"""
dx = (x - xc) / w
# y = Gaussian(x, xc, w, 1.0)
y = Lorentzian(x, xc, w, 1.0)
if I0 is None:
return y
else:
return I0 * y
[ドキュメント]
def lsqfunc(idx, x, const_names, const_values):
"""
線形最小二乗法の基底関数を返します。
詳細説明:
`idx` が `0` の場合、定数項(背景)の基底関数として `1.0` を返します。
それ以外の場合、指定されたインデックスに対応するピークの形状関数(強度1.0)を返します。
`const_values` には、ピークの中心 `xc` と半値幅 `w` のリストが含まれます。
:param idx: int: 基底関数のインデックス。0は定数項、1以降は各ピークに対応します。
:param x: float or numpy.ndarray: 基底関数を評価するx値。
:param const_names: list of str: 定数として扱われるパラメータの名前のリスト。例: ["xc", "w"]。
:param const_values: list of list of float: 定数として扱われるパラメータの値のリスト。例: [xc_list, w_list]。
:returns: float or numpy.ndarray or None: 計算された基底関数の値。
"""
if idx == 0:
return 1.0
else:
xc = const_values[0]
w = const_values[1]
return peak_func(x, None, xc[idx], w[idx])
return None
[ドキュメント]
def build_fit_params(xlist, ylist, options):
"""
ピークサーチの結果からフィッティングパラメータ変数の初期値を構築します。
詳細説明:
`peaksearch.peak_search` を用いて入力データからピークを検出し、
検出された各ピークに対して強度(I0)、中心(xc)、半値幅(w)のフィッティングパラメータを生成します。
背景定数(`bg_c0`)のパラメータも追加されます。
各パラメータには、変数名、単位、スケール、最適化ID、線形ID、初期値、初期変動量、最小値、最大値、ペナルティが設定されます。
:param xlist: list or numpy.ndarray: x値のリスト。
:param ylist: list or numpy.ndarray: y値のリスト。
:param options: dict: ピークサーチの設定を含む辞書。
"nsmooth": 平滑化ウィンドウサイズ (int)
"norder": Savitzky-Golayフィルタの多項式次数 (int)
"threshold": ピーク検出の閾値 (float)
"ydiff1_threshold": 1次微分に基づくピーク検出の閾値 (float)
:returns: tuple: フィッティングパラメータの各要素のリスト (varname, unit, pk_scale, optid, linid, x0, dx, kmin, kmax, kpenalty)。
varname (list of str): パラメータ名のリスト。
unit (list of str): パラメータ単位のリスト。
pk_scale (list of str): パラメータスケールのリスト。
optid (list of int): 最適化対象IDのリスト (1:対象, 0:対象外)。
linid (list of int): 線形パラメータIDのリスト (1:線形, 0:非線形)。
x0 (list of float): 初期値のリスト。
dx (list of float): 初期変動量のリスト。
kmin (list of float): 最小値のリスト。
kmax (list of float): 最大値のリスト。
kpenalty (list of float): ペナルティ値のリスト。
"""
nsmooth = options["nsmooth"]
norder = options["norder"]
threshold = options["threshold"]
ydiff1_threshold = options["ydiff1_threshold"]
print()
print(f"Peak search")
xpeaks, inf = peaksearch.peak_search(xlist, ylist, nsmooth, norder,
threshold, ydiff1_threshold, is_print = True)
ysmooth = inf['ysmooth']
print(f" peak list:")
for i, p in enumerate(xpeaks):
print(f" {i:03d}: xc={p[1]:8.4f} I0={ysmooth[p[0]]:10g} w={p[2]:8.4f}")
varname = ["bg_c0"]
unit = [ ""]
pk_scale = [ ""]
optid = [ 1]
linid = [ 0]
x0 = [ 0.0]
dx = [ 0.1]
kmin = [-1.0e10]
kmax = [ 1.0e10]
kpenalty = [ 1.0]
for i in range(len(xpeaks)):
varname.append (f"I0{i+1}")
unit.append ("")
pk_scale.append("")
optid.append (1)
linid.append (1)
x0.append (ysmooth[xpeaks[i][0]])
dx.append (1.0)
kmin.append (-1.0e10)
kmax.append ( 1.0e10)
kpenalty.append(1.0)
varname.append (f"xc{i+1}")
unit.append ("")
pk_scale.append("")
optid.append (1)
linid.append (0)
x0.append (xpeaks[i][1])
dx.append (0.5)
kmin.append (0.0)
kmax.append (180.0)
kpenalty.append(1.0)
varname.append (f"w{i+1}")
unit.append ("")
pk_scale.append("")
optid.append (1)
linid.append (0)
x0.append (xpeaks[i][2])
dx.append (0.2)
kmin.append (0.0)
kmax.append (100.0)
kpenalty.append(1.0)
return varname, unit, pk_scale, optid, linid, x0, dx, kmin, kmax, kpenalty
[ドキュメント]
def cal_linearpart(xlist, ylist):
"""
線形回帰に使用するyデータリストを返します。
詳細説明:
現在の実装では、入力された `ylist` をそのまま線形回帰の目的変数として返します。
将来的に、非線形成分を除去するなどの前処理を行うことが考えられます。
:param xlist: list or numpy.ndarray: x値のリスト。
:param ylist: list or numpy.ndarray: y値のリスト。
:returns: list or numpy.ndarray: 線形回帰に使用されるy値のリスト。
"""
return ylist
[ドキュメント]
def recover_pk_all(pk, ai_all):
"""
線形回帰で得られた係数を全フィッティングパラメータにマージして返します。
詳細説明:
`pk` は、非線形パラメータを含む全てのフィッティングパラメータのリストです。
`ai_all` は、線形回帰によって最適化された線形係数のリストです。
この関数は `pk` をコピーし、`ai_all` の値で線形回帰の対象となるパラメータ(背景定数とピーク強度)を更新します。
現在の実装では、`pk[0]` (背景定数) と `pk[1], pk[4], ...` (ピーク強度) が更新されます。
:param pk: list of float: 全フィッティングパラメータのリスト(初期値または前回の非線形最適化結果)。
:param ai_all: list of float: 線形回帰によって得られた線形係数のリスト。
:returns: list of float: 線形回帰の結果が反映された、更新後の全フィッティングパラメータのリスト。
"""
pk_all = pk.copy()
n_allparams = len(pk)
pk_all[0] = ai_all[0]
for i in range(1, n_allparams, 3):
pk_all[i] = ai_all[i // 3 + 1]
return pk_all
[ドキュメント]
def build_llsq_params(xlist, ylist, varname_all, pk_all, optid_all, linid_all):
"""
全フィッティングパラメータから線形回帰パラメータを抽出し、線形回帰を実行するためのデータ構造を構築します。
詳細説明:
線形回帰の対象となるパラメータ(線形係数)と、定数として扱われる非線形パラメータ
(ピーク中心 `xc` と半値幅 `w`)を分離し、線形最小二乗法で利用可能な形式に変換します。
`optid_combined` は最適化可能かつ線形なパラメータを識別します。
:param xlist: list or numpy.ndarray: x値のリスト。
:param ylist: list or numpy.ndarray: y値のリスト。
:param varname_all: list of str: 全てのフィッティングパラメータ名。
:param pk_all: list of float: 全てのフィッティングパラメータの値。
:param optid_all: list of int: 各パラメータが最適化の対象であるかを示すID (1:対象, 0:対象外)。
:param linid_all: list of int: 各パラメータが線形パラメータであるかを示すID (1:線形, 0:非線形)。
:returns: tuple: 線形回帰用のデータリスト (xlin_list, ylin_list, varname_lin, pk_lin, optid_lin, const_names, const_values)。
xlin_list (list or numpy.ndarray): 線形回帰に使用するx値。
ylin_list (list or numpy.ndarray): 線形回帰に使用するy値。
varname_lin (list of str): 線形パラメータ名のリスト。
pk_lin (list of float): 線形パラメータの初期値リスト。
optid_lin (list of int): 線形パラメータの最適化対象IDリスト。
const_names (list of str): 定数パラメータ名のリスト。
const_values (list of list of float): 定数パラメータの値リスト。
"""
ylin_list = cal_linearpart(xlist, ylist)
xlin_list = xlist
n_allparams = len(varname_all)
# optid_combined includes all fitting parameters
optid_combined = [int(oid and lid) for oid, lid in zip(optid_all, linid_all)]
varname_lin = [varname_all[0]]
pk_lin = [pk_all[0]]
xc_lin = [None]
w_lin = [None]
optid_lin = [optid_combined[0]]
for i in range(1, n_allparams, 3):
varname_lin.append(varname_all[i])
optid_lin.append(optid_combined[i])
xc_lin.append(pk_all[i + 1])
w_lin.append(pk_all[i + 2])
pk_lin.append(pk_all[i])
const_names = ["xc", "w"]
const_values = [xc_lin, w_lin]
return xlin_list, ylin_list, varname_lin, pk_lin, optid_lin, const_names, const_values
[ドキュメント]
def cal_ylist(app, xk_all, x_list, fit = None, run = True, print_level = 1):
"""
2次元リストの変数x_listから、2次元リストの関数値y_listを計算します。
詳細説明:
与えられたフィッティングパラメータ `xk_all` (背景定数、各ピークの強度、中心、半値幅) を使用して、
各x値に対応するモデルのy値を計算します。
複数のピークがある場合、それぞれのピーク関数と背景定数を加算して全体のモデルを構築します。
:param app: tkApplication: アプリケーションオブジェクト。
:param xk_all: list of float: 全てのフィッティングパラメータの値。
:param x_list: list of list of float or numpy.ndarray: x値の2次元リスト。通常は `[x_data]`。
:param fit: object, optional: フィットオブジェクト(現在は未使用)。
:param run: bool, optional: 実行フラグ(現在は未使用)。
:param print_level: int, optional: 出力レベル(現在は未使用)。
:returns: list of numpy.ndarray: 計算されたモデルのy値の2次元リスト。
"""
bgc0 = xk_all[0]
ylist = np.zeros(len(x_list[0]))
xc1 = xk_all[5]
print("xc1=", xc1)
for i, x in enumerate(x_list[0]):
ylist[i] = bgc0
for ip in range(1, len(xk_all), 3):
I0 = xk_all[ip]
xc = xk_all[ip + 1]
w = xk_all[ip + 2]
ylist[i] += peak_func(x, I0, xc, w)
return [ylist]
[ドキュメント]
def save_data(path, labels, data_list):
"""
データとラベルをExcelファイルに保存します。
詳細説明:
与えられたデータリストと対応するラベルを用いて、Pandas DataFrameを構築し、
指定されたパスにExcelファイルとして保存します。
DataFrameのインデックスは出力されず、ヘッダーは含まれます。
:param path: str: データを保存するExcelファイルのパス。
:param labels: list of str: 各列に対応するラベルのリスト。
:param data_list: list of list or numpy.ndarray: 保存するデータのリスト。各要素が列データに対応します。
:returns: None
"""
print()
print(f"Save data to [{path}]")
df = pd.DataFrame(np.array(data_list).T, columns = labels)
df.to_excel(path, index = False, header = True)
[ドキュメント]
def peak_search(app, cfg, mf):
"""
データに対してピークサーチを実行し、結果をプロットします。
詳細説明:
`read_input_data` 関数で入力データを読み込み、`peaksearch.peak_search` を呼び出して
ピークを自動的に検出します。
検出されたピーク(位置、強度、半値幅)は、元のデータとスムージングされたデータ、
および検出された各ピークの仮のフィット曲線と共にプロットされます。
Savitzky-Golayフィルタによるスムージングと1次微分に基づくピーク検出が利用されます。
:param app: tkApplication: アプリケーションオブジェクト。
:param cfg: tkParams: 設定パラメータを保持するオブジェクト。
`infile`, `mode`, `xlabel`, `ylabel`, `xmin`, `xmax`,
`norder`, `nsmooth`, `threshold`, `ydiff1_threshold`,
`figsize`, `fontsize`, `legend_fontsize` 属性を持ちます。
:param mf: object, optional: マスターフィットオブジェクト(現在は未使用のためNone)。
:returns: None
"""
krange = 10.0
cfg.figsize_test = [12, 8]
cfg.fontsize_test = 12
print("")
print(f"Peak search in the data [{cfg.infile}]")
print("mode : ", cfg.mode)
print("infile : ", cfg.infile)
print("xlabel : ", cfg.xlabel)
print("ylabel : ", cfg.ylabel)
print(" x range : ", cfg.xmin, cfg.xmax)
print()
xlabels, ylabels, xdatalist, ydatalist, wdatalist = read_input_data(app, infile, options = {})
'''
fit = mf.init_fit(app, cfg)
fit.x_labels, fit.y_labels, fit.xd_list, fit.yd_list, fit.w_list = mf.read_input_data(app, cfg.infile, cfg)
x = fit.xd_list[0]
y = fit.yd_list[0]
xlabel = fit.x_labels[0]
ylabel = fit.y_labels[0]
'''
xlabel = xlabels[0]
ylabel = ylabels[0]
x = xdatalist[0]
y = ydatalist[0]
ndata = len(x)
print("norder : ", cfg.norder)
print("nsmooth : ", cfg.nsmooth)
if cfg.nsmooth % 2 == 0:
cfg.nsmooth += 1
print(f" Warning: nsmooth must be odd. Changed to {cfg.nsmooth}")
print("threshold : ", cfg.threshold)
print("dy/dx threshold : ", cfg.ydiff1_threshold)
print(" ndata = ", ndata)
xpeaks, inf = peaksearch.peak_search(x, y, cfg.nsmooth, cfg.norder, cfg.threshold, cfg.ydiff1_threshold, is_print = True)
ysmooth = inf['ysmooth']
#=============================
# prepare graph
#=============================
print("")
print("plot")
ndata = len(x)
dx = x[1] - x[0]
def plot_input(ax_input):
"""
ピークサーチ結果をプロットする内部関数。
"""
maxI = max(y)
bar_range = [-0.05 * maxI, -0.01 * maxI]
ax_input.plot(x, y, label = 'input', linestyle = '', marker = 'o', markersize = 0.5, markerfacecolor = 'black', markeredgecolor = 'black')
ax_input.plot(ax_input.get_xlim(), [0.0, 0.0], linestyle = 'dashed', color = 'red', linewidth = 0.5)
ylim = ax_input.get_ylim()
for i in range(len(xpeaks)):
idx = xpeaks[i][0]
_x = x[idx]
_I = ysmooth[idx]
_w = xpeaks[i][2]
_Ihalf = _I / 2.0
nx = int(_w / dx * krange + 1.00001)
xx = [x[i1] for i1 in range(max([0, idx - nx]), min([idx + nx, ndata]))]
# a_g = _w / 0.832554611
# gf = [Gauss(xx[i1], _x, a_g, _I) for i1 in range(len(xx))]
gf = [peak_func(xx[i1], _I, _x, _w) for i1 in range(len(xx))]
ax_input.plot([_x, _x], bar_range, linestyle = '-', color = 'black', linewidth = 0.5)
ax_input.plot([_x - _w, _x + _w], [_Ihalf, _Ihalf], linestyle = '-', color = 'green', linewidth = 0.5)
ax_input.plot(xx, gf, linestyle = '-', color = 'red', linewidth = 0.5)
# ax_input.set_xlabel(xlabel, fontsize = cfg.fontsize)
ax_input.set_ylabel(ylabel, fontsize = cfg.fontsize)
ax_input.legend(fontsize = cfg.legend_fontsize)
fig, axes = plt.subplots(1, 1, sharex = 'all', figsize = cfg.figsize)
axes.tick_params(labelsize = cfg.fontsize)
plot_input(axes)
axes.set_xlabel(xlabel, fontsize = cfg.fontsize)
plt.tight_layout()
plt.pause(0.01)
app.terminate(pause = True)
[ドキュメント]
class tkParams():
"""
プログラム設定を保持するための空のクラスです。
詳細説明:
このクラスは、設定パラメータを属性として動的に追加するために使用されます。
例えば、`cfg.infile = 'data.xlsx'` のように利用されます。
"""
pass
[ドキュメント]
class tkApplication():
"""
アプリケーションの基本的な動作を模倣するクラスです。
詳細説明:
主にプログラムの終了動作を制御するために使用されます。
`terminate` メソッドは、ユーザーからの入力を待ってプログラムを一時停止させる機能を提供します。
"""
def __init__(self):
"""
tkApplicationクラスのコンストラクタ。
詳細説明:
特に初期化処理はありません。
"""
pass
[ドキュメント]
def terminate(self, pause = False):
"""
アプリケーションを終了または一時停止します。
詳細説明:
`pause` がTrueの場合、ユーザーがEnterキーを押すまでプログラムの実行を一時停止します。
これにより、プロットされたグラフなどを確認する時間を確保できます。
:param pause: bool, optional: Trueの場合、終了前にユーザー入力を待ちます。デフォルトはFalse。
:returns: None
"""
if pause:
input("Press ENTER to terminate>>")
[ドキュメント]
def main():
"""
メイン実行関数。ピークフィットテストプログラムの実行を制御します。
詳細説明:
`tkParams` と `tkApplication` のモックオブジェクトを初期化し、
設定パラメータを設定します。
その後、入力データの読み込み、ファイル保存、およびピークサーチ(とプロット)の
一連の処理を実行します。
"""
app = tkApplication()
cfg = tkParams()
cfg.infile = infile
cfg.mode = "plot"
cfg.xlabel = 0
cfg.ylabel = 1
cfg.norder = 5
cfg.nsmooth = 7
cfg.xmin = 20.0
cfg.xmax = 40.0
cfg.threshold = 3000
cfg.ydiff1_threshold = 1.0
cfg.figsize = (8, 6)
cfg.fontsize = 12
cfg.legend_fontsize = 12
print()
print("Peak fit test program / odata module")
print(f"infile={infile}")
print(f"outfile={outfile}")
xlabels, ylabels, xdatalist, ydatalist, wdatalist = read_input_data(app, infile, options = {})
save_data(outfile, xlabels + ylabels, xdatalist + ydatalist)
# plot_input(app, cfg, mf = None)
peak_search(app, cfg, mf = None)
if __name__ == '__main__':
main()