"""
概要: Scipyのminimize関数を用いたガウスピークフィッティングスクリプト。
詳細説明: このスクリプトは、与えられたデータに対してガウス関数をフィッティングするために
Scipyの最適化ルーチンminimizeを使用します。初期パラメータから開始し、
残差二乗和を最小化することで最適なピークパラメータ(振幅、中心、幅)を探索します。
結果はCSVファイルに保存され、リアルタイムでフィッティングの進行状況をグラフ表示できます。
関連リンク: :doc:`peakfit-scipy-minimize_usage`
"""
import sys
import csv
import numpy as np
from math import exp
from scipy.optimize import minimize
import matplotlib.pyplot as plt
#nelder-mead Downhill simplex
#powell Modified Powell
#cg conjugate gradient (Polak-Ribiere method)
#bfgs BFGS法
#newton-cg Newton-CG
#trust-ncg 信頼領域 Newton-CG 法
#dogleg 信頼領域 dog-leg 法
#L-BFGS-B’ (see here)
#TNC’ (see here)
#COBYLA’ (see here)
#SLSQP’ (see here)
#trust-constr’(see here)
#dogleg’ (see here)
#trust-exact’ (see here)
#trust-krylov’ (see here)
method = "cg"
maxiter = 100
tol = 1.0e-5
#==========================================
# Source parameters to be fitted
#==========================================
# Data parameters: I0, x0, w
peaks = [1.1, 0.4, 0.4]
nparams = len(peaks)
xrange = [-1.0, 2.0]
nx = 101
xstep = (xrange[1] - xrange[0]) / (nx - 1)
xd = []
for i in range(nx):
xd.append(xrange[0] + i * xstep)
yd = []
# optimization parameters
x0 = [1.3, 0.6, 0.1]
#==========================================
# Graph parameters
#==========================================
fplot = 1
ngdata = 51
xgmin = -4.0
xgmax = 4.0
ygmin = -4.0
ygmax = 4.0
tsleep = 0.3
#==========================================
# File configurations
#==========================================
initial_csv = 'initial.csv'
final_csv = 'final.csv'
conv_csv = 'convergence.csv'
if __name__ == "__main__":
argv = sys.argv
n = len(argv)
if n >= 2:
x0[0] = float(argv[1])
if n >= 3:
x0[1] = float(argv[2])
if n >= 4:
x0[2] = float(argv[3])
if n >= 5:
iprint_interval = int(argv[4])
#==========================================
# functions
#==========================================
[ドキュメント]
def save_csv(path, headerlist, datalist, is_print = 0):
"""
概要: 指定されたパスにCSV形式でデータを保存します。
詳細説明: ヘッダーとデータリストを受け取り、CSVファイルとして書き込みます。
データの各列は`datalist`の要素に対応し、行は各列の対応するインデックスのデータで構成されます。
`is_print`が1の場合、書き込まれるデータ行が標準出力にも表示されます。
:param path: str - 保存先のファイルパス。
:param headerlist: list - CSVファイルのヘッダー行のリスト。
:param datalist: list - 保存するデータのリスト(例: `[[col1_data], [col2_data], ...]`)。
各内部リストは1つの列のデータを含みます。
:param is_print: int - データをコンソールに出力するかどうかを示すフラグ (0:しない, 1:する)。
:returns: int - ファイル書き込みが成功した場合は1、失敗した場合は0。
"""
f = open(path, 'w')
if not f:
return 0
# print("len=", len(datalist), len(datalist[0]))
writer = csv.writer(f, delimiter=',', lineterminator='\n')
writer.writerow(headerlist)
for i in range(len(datalist[0])):
dlist = [datalist[id][i] for id in range(len(datalist))]
dliststr = joinf(dlist, "%12.8g", ", ")
if is_print:
print(" {:3d}: {}".format(i, dliststr))
writer.writerow(dlist)
f.close()
return 1
[ドキュメント]
def joinf(list, format, sep):
"""
概要: リストの要素を指定されたフォーマットと区切り文字で結合します。
詳細説明: 渡されたリストの各要素を指定された書式文字列でフォーマットし、
指定された区切り文字で連結して一つの文字列として返します。
これは主に、数値のリストを整形して出力するために使用されます。
:param list: list - 結合する数値のリスト。
:param format: str - 各要素に適用する書式文字列(例: `"%12.8g"`)。
:param sep: str - 要素間の区切り文字。
:returns: str - フォーマットされ結合された文字列。
"""
s = format % (list[0])
for i in range(1, len(list)):
s += sep + format % (list[i])
return s
[ドキュメント]
def ycal(x, params):
"""
概要: 与えられたx値とガウスピークパラメータに基づいてy値を計算します。
詳細説明: 複数のガウスピークの合計を計算します。
`params`リストは`[I0_1, x0_1, w_1, I0_2, x0_2, w_2, ...]`
の形式でピークパラメータを含みます。
各ガウスピークは`I0 * exp(-(A * (x - x0))^2)`で表され、
`A`は`0.832554611 / w`で計算されます。
`w`が非常に小さい(0.001未満)場合、数値安定性のために0.001にクリップされます。
:param x: float - y値を計算するx座標。
:param params: list - ガウスピークのパラメータのリスト。
`[I0_1, x0_1, w_1, I0_2, x0_2, w_2, ...]`の形式。
`I0`はピークの高さ、`x0`はピークの中心、`w`はピークの半値幅。
:returns: float - 計算されたy値。
"""
ret = 0.0
for ip in range(0, len(params), 3):
I0 = params[ip]
x0 = params[ip+1]
w = params[ip+2]
if w < 0.001:
w = 0.001
a = 0.832554611 / w
X = a * (x - x0)
ret += I0 * exp(-X * X)
return ret
[ドキュメント]
def ycal_list(xd, params):
"""
概要: x座標のリストに対してy値を計算し、リストとして返します。
詳細説明: 与えられたx座標の各点に対して`ycal`関数を呼び出し、
対応するy値のリストを生成します。これはプロットやデータ保存に利用されます。
:param xd: list - x座標のリスト。
:param params: list - ガウスピークのパラメータのリスト。
:returns: list - 各x座標に対応する計算されたy値のリスト。
"""
print("pa=", params)
y = []
for i in range(len(xd)):
y.append(ycal(xd[i], params))
return y
[ドキュメント]
def CalS2(params):
"""
概要: 実測データとモデルとの残差二乗和(S2)を計算します。
詳細説明: グローバル変数`xd`(x座標)と`yd`(実測yデータ)を使用し、
現在の`params`で計算されるモデルのy値(`yc`)との差の二乗和を計算します。
この関数は、Scipyの`minimize`関数における目的関数として利用されます。
:param params: list - 現在のガウスピークのパラメータ。
:returns: float - 計算された残差二乗和。
"""
global xd, yd, nx
S2 = 0.0
for i in range(nx):
yc = ycal(xd[i], params)
d = yd[i] - yc
S2 += d * d
return S2
[ドキュメント]
def diff1i(i, params):
"""
概要: 特定のパラメータに関する残差二乗和の偏導関数を計算します。
詳細説明: `CalS2`関数の特定のピークの特定のパラメータ(`I0`, `x0`, `w`のいずれか)に関する
解析的な偏導関数を計算します。この関数は、`diff1`関数から呼び出され、
Scipyの`minimize`関数のヤコビアンとして使用される勾配ベクトルを構築します。
`w`が非常に小さい場合、0.001にクリップされます。
`xd`, `yd`, `nx`はグローバル変数としてアクセスされます。
:param i: int - 偏導関数を計算するパラメータのグローバルインデックス。
`i % 3`で`I0` (0)、`x0` (1)、`w` (2) のいずれか、
`i // 3`で対応するピーク番号を特定します。
:param params: list - 現在のガウスピークのパラメータのリスト。
:returns: float - 指定されたパラメータに関する残差二乗和の偏導関数。
"""
global xd, yd, nx
ip = i // 3
idx = i % 3
I0 = params[ip*3]
x0 = params[ip*3+1]
w = params[ip*3+2]
ret = 0.0
for id in range(nx):
if w < 0.001:
w = 0.001
yc = ycal(xd[id], [I0, x0, w])
dy = yd[id] - yc
a = 0.832554611 / w
dx = a * (xd[id] - x0)
e = exp(-dx*dx)
if idx == 0: # I0
dif = e
elif idx == 1: # x0
dif = 2.0 * a * dx * I0 * e
elif idx == 2: # w
dif = 2.0 * dx * dx / w * I0 * e
else:
print("Error in diff1i: Invalid index (ivar={}, ipeak={}, diff_idx={})".format(i, ip, idx))
exit()
d = -2.0 * dif * dy
ret += d
return ret
[ドキュメント]
def diff1(params):
"""
概要: 残差二乗和の全パラメータに関する勾配ベクトル(ヤコビアン)を計算します。
詳細説明: `diff1i`関数を各パラメータに対して呼び出し、
残差二乗和の全パラメータに関する偏導関数からなるNumPy配列を構築します。
この関数はScipyの`minimize`関数にヤコビアン(`jac`引数)として渡され、
最適化アルゴリズムの効率を高めるために利用されます。
`nparams`はグローバル変数としてアクセスされます。
:param params: list - 現在のガウスピークのパラメータのリスト。
:returns: numpy.ndarray - 残差二乗和の全パラメータに関する勾配ベクトル。
"""
global nparams
df = np.empty(len(params))
for i in range(len(params)):
df[i] = diff1i(i, params)
return df
iter = 0
xiter = []
yfmin = []
figure = None
ax_fit = None
rawdata = None
inidata = None
fitdata = None
ax_conv = None
convdata = None
[ドキュメント]
def callback(xk):
"""
概要: 最適化の各イテレーションで呼び出され、進行状況を更新しグラフを再描画します。
詳細説明: Scipyの`minimize`関数の`callback`引数として使用されます。
現在のパラメータ`xk`に基づいて残差二乗和を計算し、
イテレーション回数と残差二乗和の履歴(`xiter`, `yfmin`)を更新します。
さらに、フィッティング曲線と収束曲線を示すグラフをリアルタイムで更新し、
最適化の進行状況を視覚的に表示します。
この関数は多数のグローバル変数(`xd`, `iter`, `figure`, `ax_fit`, `ax_conv`,
`fitdata`, `convdata`)にアクセスし、それらを変更します。
:param xk: numpy.ndarray - 現在の最適化パラメータの配列。
:returns: None
"""
global xd
global iter
global figure, ax_fit, ax_conv
global fitdata, convdata
fmin = CalS2(xk)
print("callback {}: xk={}".format(iter, xk))
print(" fmin={}".format(fmin))
iter += 1
xiter.append(iter)
yfmin.append(fmin)
convdata[0].set_data(xiter, yfmin)
ax_conv.set_xlim((0.0, max(xiter) + 1.0))
ax_conv.set_ylim((min(yfmin) * 0.8, max(yfmin) * 1.2))
yc = ycal_list(xd, xk)
fitdata[0].set_data(xd, yc)
plt.pause(0.2)
#==========================================
# Main routine
#==========================================
[ドキュメント]
def main():
"""
概要: ピークフィッティングプロセス全体を管理するメインルーチン。
詳細説明:
この関数は、以下のステップを実行してガウスピークフィッティングのワークフローを管理します。
1. フィッティングの対象となるシミュレートされたデータ(`yd`)を生成します。
2. 初期パラメータ(`x0`)に基づく初期フィッティング曲線(`yini`)を計算します。
3. 初期データとシミュレートされたデータを`initial_csv`ファイルに保存します。
4. `fplot`フラグが1に設定されている場合、matplotlibを用いて
生データ、初期フィット、およびフィッティングの進行状況(目的関数の収束)を
リアルタイムで表示するためのグラフを初期化します。
5. `scipy.optimize.minimize`関数を呼び出して最適化を実行します。
これには、目的関数`CalS2`、解析的なヤコビアン`diff1`、および
各イテレーションでグラフを更新する`callback`関数が使用されます。
6. 最適化が完了した後、最終的な最適化パラメータ(`res.x`)を用いて
最終的なフィッティング曲線(`yopt`)を計算し、
`final_csv`ファイルに生データ、初期フィット、最終フィットと共に保存します。
7. 最適化の収束履歴(イテレーションごとの目的関数の値)を`conv_csv`ファイルに保存します。
8. `fplot`が1の場合、グラフを閉じるためにユーザーからの入力を待ちます。
この関数は、多くのグローバル変数(`xd`, `yd`, `xiter`, `yfmin`, `figure`, `ax_fit`,
`ax_conv`, `fitdata`, `convdata`など)にアクセスし、それらを初期化または更新します。
:returns: None
"""
global xd, yd, xiter, yfmin
global figure, ax_fit, ax_conv
global fitdata, convdata
print("")
print("Peak fitting by python scipy.optimize")
yd = ycal_list(xd, peaks)
yini = ycal_list(xd, x0)
print("Initial data are saved to [{}]".format(initial_csv))
ret = save_csv(initial_csv, ['x', 'y(data)', 'y(ini)'], [xd, yd, yini])
if ret == 0:
print("Error: Can not write to [{}]".format(initial_csv))
if fplot == 1:
figure = plt.figure(figsize = (10, 5))
ax_fit = figure.add_subplot(1, 2, 1)
rawdata = ax_fit.plot(xd, yd, color = 'blue', linestyle = '', linewidth = 0.5,
fillstyle = 'full', marker = 'x', markersize = 5)
inidata = ax_fit.plot(xd, yini, color = 'black', linestyle = 'dashed', linewidth = 0.5)
fitdata = ax_fit.plot([], [], color = 'red', linestyle = '-', linewidth = 0.5)
ax_conv = figure.add_subplot(1, 2, 2)
ax_conv.set_yscale('log')
convdata = ax_conv.plot([], [], color = 'black', linestyle = '-', linewidth = 0.5,
fillstyle = 'full', marker = 'o', markersize = 5)
plt.pause(0.001)
res = minimize(CalS2, x0, jac=diff1, method=method, tol = tol, callback = callback,
options = {'maxiter':maxiter, "disp":True})
print(res)
yopt = ycal_list(xd, res.x)
print("Final data are saved to [{}]".format(final_csv))
ret = save_csv(final_csv, ['x', 'y(data)', 'y(ini)', 'y(opt)'], [xd, yd, yini, yopt])
if ret == 0:
print("Error: Can not write to [{}]".format(final_csv))
print("Convergence data are saved to [{}]".format(conv_csv))
ret = save_csv(conv_csv, ['iter', 'error'], [xiter, yfmin])
if ret == 0:
print("Error: Can not write to [{}]".format(conv_csv))
if fplot == 1:
print("Press ENTER to terminate:", end = '')
ret = input()
if __name__ == "__main__":
main()