import sys
import numpy as np
import pandas as pd
import scipy.signal
import matplotlib.pyplot as plt
import matplotlib.widgets as wg
from tklib.tkutils import replace_path, getarg, getintarg, getfloatarg, pint, pfloat
from tklib.tkparams import tkParams
from tklib.tkvariousdata import tkVariousData
from tklib.tkapplication import tkApplication
"""
スムージング: 異なる手法とパラメータによるデータ平滑化
概要:
異なる手法とパラメータを用いて時系列データや信号データを平滑化するスクリプトです。
Savitzky-Golayフィルター(多項式フィッティング)や移動平均などの手法を提供し、
データのノイズを低減し、潜在的な傾向を強調することを目的としています。
詳細説明:
このスクリプトは、指定された入力ファイルからデータを読み込み、
ユーザーが設定した平滑化パラメータ(窓幅、多項式次数、微分次数など)に基づいて
平滑化処理を実行します。結果はグラフとして表示され、オプションでExcelファイルに保存されます。
コマンドライン引数を通じて、処理モード、入力/出力ファイル、データ範囲、
平滑化手法とそのパラメータを柔軟に指定できます。
また、必要に応じてリサンプリング処理も実行可能です。
関連リンク:
:doc:`smoothing_usage`
"""
#=============================
# parameters
#=============================
[ドキュメント]
def usage(app):
"""
使用方法を表示します。
概要:
スクリプトのコマンドライン引数の利用方法を標準出力に表示します。
詳細説明:
この関数は、プログラムが不正な引数で呼び出された場合や、
ユーザーがヘルプを求めた場合に、期待される引数の形式とその意味を説明します。
:param app: tkApplication: tkApplicationのインスタンス。
"""
print("usage: python {} args".format(sys.argv[0]))
[ドキュメント]
def initialize(app):
"""
アプリケーションのパラメータを初期化します。
概要:
アプリケーションのコアパラメータオブジェクト `app.cparams` を初期設定します。
詳細説明:
`tkParams` オブジェクトを生成し、入力ファイル、処理モード、X/Y軸のラベル、
データ範囲、平滑化に使用する次数と窓幅、リサンプリング設定など、
プログラム実行に必要なデフォルト値を設定します。
:param app: tkApplication: tkApplicationのインスタンス。
"""
app.cparams = tkParams()
cparams = app.cparams
cparams.infile = 'xrd.xlsx'
cparams.mode = 'test'
cparams.xmin = None
cparams.xmax = None
cparams.xlabel = 0
cparams.ylabel = 1
cparams.norder = 2
cparams.nsmooth = 11
cparams.nincrement = 10
cparams.ndifferential = 0
cparams.do_resampling = 0
cparams.sampling_step = 0.02
cparams.template = "StandardGraph.xlsm"
cparams.figsize = [6, 6]
cparams.fontsize = 16
cparams.fontsize_s = 10
[ドキュメント]
def update_vars(app):
"""
コマンドライン引数に基づいてアプリケーションのパラメータを更新します。
概要:
`sys.argv` を解析し、提供されたコマンドライン引数で `app.cparams` の値を上書きします。
詳細説明:
スクリプト実行時にコマンドラインから渡される引数を読み取り、
対応するアプリケーションパラメータ(入力ファイル、X/Y軸、データ範囲、
平滑化メソッド、平滑化次数、窓幅、微分次数、リサンプリング設定など)を更新します。
引数が指定されていない場合は、`initialize` 関数で設定されたデフォルト値が維持されます。
:param app: tkApplication: tkApplicationのインスタンス。
"""
cparams = app.cparams
argv = sys.argv
n = len(argv)
if n >= 2:
cparams.mode = argv[1]
if n >= 3:
cparams.infile = argv[2]
if n >= 4:
cparams.xmin = pfloat(argv[3], defval = argv[3])
if n >= 5:
cparams.xmax = pfloat(argv[4], defval = argv[4])
if n >= 6:
cparams.xlabel = pint(argv[5], defval = argv[5])
if n >= 7:
cparams.ylabel = pint(argv[6], defval = argv[6])
if n >= 8:
cparams.method = argv[7]
if n >= 9:
cparams.norder = pint(argv[8], cparams.norder)
if n >= 10:
cparams.nsmooth = pint(argv[9], cparams.nsmooth)
if n >= 11:
cparams.nincrement = pint(argv[10], defval = cparams.nincrement)
if n >= 12:
cparams.ndifferential = pint(argv[11], cparams.ndifferential)
if n >= 13:
cparams.do_resampling = pint(argv[12], cparams.do_resampling)
if n >= 14:
cparams.sampling_step = pfloat(argv[13], cparams.sampling_step)
[ドキュメント]
def diff2forward(x, y, i):
"""
2点前方差分法により関数の導関数を計算します。
概要:
与えられたデータポイント `(x, y)` のインデックス `i` における1階導関数を計算します。
詳細説明:
導関数は `(y[i+1] - y[i]) / (x[i+1] - x[i])` の式で計算されます。
計算が可能な範囲 (0 <= i <= n-2) 外のインデックスが指定された場合は、空文字列を返します。
:param x: list or np.ndarray: X座標のデータ配列。
:param y: list or np.ndarray: Y座標のデータ配列。
:param i: int: 導関数を計算するデータポイントのインデックス。
:returns: float or str: 計算された導関数、または計算範囲外の場合は空文字列。
"""
n = len(x)
if(0 <= i <= n-2):
return (y[i+1] - y[i]) / (x[i+1] - x[i]);
return ''
[ドキュメント]
def diff2backward(x, y, i):
"""
2点後方差分法により関数の導関数を計算します。
概要:
与えられたデータポイント `(x, y)` のインデックス `i` における1階導関数を計算します。
詳細説明:
導関数は `(y[i] - y[i-1]) / (x[i] - x[i-1])` の式で計算されます。
計算が可能な範囲 (1 <= i <= n-1) 外のインデックスが指定された場合は、空文字列を返します。
:param x: list or np.ndarray: X座標のデータ配列。
:param y: list or np.ndarray: Y座標のデータ配列。
:param i: int: 導関数を計算するデータポイントのインデックス。
:returns: float or str: 計算された導関数、または計算範囲外の場合は空文字列。
"""
n = len(x)
if(1 <= i <= n-1):
return (y[i] - y[i-1]) / (x[i] - x[i-1]);
return ''
[ドキュメント]
def diff3(x, y, i):
"""
3点中心差分法により関数の導関数を計算します。
概要:
与えられたデータポイント `(x, y)` のインデックス `i` における1階導関数を計算します。
詳細説明:
導関数は `(y[i+1] - y[i-1]) / (2.0 * (x[i] - x[i-1]))` の式で計算されます。
計算が可能な範囲 (1 <= i <= n-2) 外のインデックスが指定された場合は、空文字列を返します。
:param x: list or np.ndarray: X座標のデータ配列。
:param y: list or np.ndarray: Y座標のデータ配列。
:param i: int: 導関数を計算するデータポイントのインデックス。
:returns: float or str: 計算された導関数、または計算範囲外の場合は空文字列。
"""
n = len(x)
if(1 <= i <= n-2):
return (y[i+1] - y[i-1]) / 2.0 / (x[i] - x[i-1]);
return ''
[ドキュメント]
def diff5(x, y, i):
"""
5点中心差分法により関数の導関数を計算します。
概要:
与えられたデータポイント `(x, y)` のインデックス `i` における1階導関数を計算します。
詳細説明:
より広範な5つのデータポイントを使用して導関数を計算します。
計算が可能な範囲 (2 <= i <= n-3) 外のインデックスが指定された場合は、空文字列を返します。
:param x: list or np.ndarray: X座標のデータ配列。
:param y: list or np.ndarray: Y座標のデータ配列。
:param i: int: 導関数を計算するデータポイントのインデックス。
:returns: float or str: 計算された導関数、または計算範囲外の場合は空文字列。
"""
n = len(x)
if(2 <= i <= n-3):
return (-y[i+2] + 8.0*y[i+1] - 8.0*y[i-1] + y[i-2]) / 12.0 / (x[i] - x[i-1]);
return ''
[ドキュメント]
def diff7(x, y, i):
"""
7点中心差分法により関数の導関数を計算します。
概要:
与えられたデータポイント `(x, y)` のインデックス `i` における1階導関数を計算します。
詳細説明:
より広範な7つのデータポイントを使用して導関数を計算します。
計算が可能な範囲 (3 <= i <= n-4) 外のインデックスが指定された場合は、空文字列を返します。
:param x: list or np.ndarray: X座標のデータ配列。
:param y: list or np.ndarray: Y座標のデータ配列。
:param i: int: 導関数を計算するデータポイントのインデックス。
:returns: float or str: 計算された導関数、または計算範囲外の場合は空文字列。
"""
n = len(x)
if(3 <= i <= n-4):
return (y[i+3] - 9.0*y[i+2] + 45.0*y[i+1]
- 45.0*y[i-1] + 9.0*y[i-2] - y[i-3]) / 60.0 / (x[i] - x[i-1]);
return ''
[ドキュメント]
def SmoothingBySimpleAverage(y, n):
"""
シンプルな移動平均法によってデータを平滑化します。
概要:
各データポイントに対して、指定された窓幅 `n` 内のデータの平均値を計算し、平滑化します。
詳細説明:
データ配列 `y` の各要素について、その要素を中心とした窓幅 `n` の範囲にある
全データポイントの平均値を新しい値として算出します。
窓の端点(データ配列の最初と最後)では、利用可能なデータのみが平均計算に使用されます。
:param y: list or np.ndarray: 平滑化するY座標のデータ配列。
:param n: int: 平滑化に使用する窓幅(ポイント数)。
:returns: list: 平滑化されたY座標のデータ配列。
"""
n2 = int(n / 2);
ndata = len(y);
ys = []
for i in range(0, ndata):
c = 0;
ys.append(0.0);
for k in range(i - n2, i + n2 + 1):
if k < 0 or k >= ndata:
continue
ys[i] += y[k]
c += 1
if c > 0:
ys[i] /= c;
else:
ys[i] = y[i]
return ys;
[ドキュメント]
def SmoothingByPolynomialFit(y, n):
"""
多項式フィッティング(サビツキー・ゴレイフィルターに類似)によってデータを平滑化します。
概要:
各データポイントの周囲 `n` 点の範囲で多項式をフィッティングし、データを平滑化します。
詳細説明:
この関数は、Savitzky-Golayフィルターの重み付け係数 `w23j` を用いて、
各データポイント `y[i]` をその周囲 `n` 点の加重平均に置き換えることで平滑化を行います。
これにより、ノイズを低減しつつ、元のデータの形状を保持する効果が期待できます。
窓の端点では、利用可能なデータのみが考慮されます。
:param y: list or np.ndarray: 平滑化するY座標のデータ配列。
:param n: int: 平滑化に使用する窓幅(ポイント数)。
:returns: list: 平滑化されたY座標のデータ配列。
"""
m = int(n / 2);
W23 = (4.0 * m * m - 1.0) * (2.0 * m + 3.0) / 3.0;
w23j = [0.0]*n
for j in range(-m, m+1):
w23j[j + m] = (3.0 * m * (m+1.0) - 1.0 - 5.0 * j * j) / W23
ndata = len(y)
ys = []
for i in range(0, ndata):
c = 0.0;
ys.append(0.0);
for j in range(-m, m+1):
k = i + j
if k < 0 or k >= ndata:
continue
ys[i] += w23j[j+m] * y[k]
c += w23j[j+m]
if c > 0:
ys[i] /= c
else:
ys[i] = y[i]
return ys;
[ドキュメント]
def test(app):
"""
データの平滑化処理を実行し、結果をプロットして保存します。
概要:
指定された入力ファイルからデータを読み込み、各種平滑化手法を適用し、
結果をグラフに表示し、Excelファイルに保存します。
詳細説明:
`app.cparams` に設定されたパラメータに基づいて、データの読み込み、
X/Y軸のフィルタリング(`xmin`, `xmax`)、そして平滑化処理(`simple` または `polynomial`)を実行します。
`cparams.mode` が 'test' の場合、複数の条件で平滑化を試行し、複数のサブプロットに表示します。
それ以外の場合は単一の平滑化処理を行い、結果をExcelファイルに保存します。
平滑化後のデータは、必要に応じてリサンプリングされます。
:param app: tkApplication: tkApplicationのインスタンス。
"""
cparams = app.cparams
cparams.logfile = app.replace_path(cparams.infile)
cparams.outfile = app.replace_path(cparams.infile, template = ["{dirname}", "{filebody}-smoothed.xlss"])
print(f"Open logfile [{cparams.logfile}]")
app.redirect(targets = ["stdout", cparams.logfile], mode = 'w')
print("")
print(f"Smoothing data in [{cparams.infile}]")
print("infile : ", cparams.infile)
print("mode : ", cparams.mode)
print("xlabel : ", cparams.xlabel)
print("ylabel : ", cparams.ylabel)
print(" x range : ", cparams.xmin, cparams.xmax)
print("norder : ", cparams.norder)
print("nsmooth : ", cparams.nsmooth)
print("nincrement: ", cparams.nincrement)
print("order of differentiation: ", cparams.ndifferential)
print(f"do_resampling: {cparams.do_resampling}")
print(f"sampling_step: {cparams.sampling_step}")
if '*** choose ***' in cparams.xlabel:
app.terminate(f"\nError: Choose x label\nTerminated.\n", usage = usage, pause = True)
if '*** choose ***' in cparams.ylabel:
app.terminate(f"\nError: Choose y label\nTerminated.\n", usage = usage, pause = True)
print("Read data from [{}]".format(cparams.infile))
datafile = tkVariousData(cparams.infile)
header, datalist = datafile.Read_minimum_matrix(close_fp = True)#, usage = usage)
xlabel, _xin = datafile.FindDataArray(cparams.xlabel, flag = 'i')
ylabel, _yin = datafile.FindDataArray(cparams.ylabel, flag = 'i')
if _xin is None:
app.terminate(f"Error: Can not find the data with [{cparams.xlabel}]", usage = usage, pause = True)
if _yin is None:
app.terminate(f"Error: Can not find the data with [{cparams.xlabel}]", usage = usage, pause = True)
x = []
y = []
for i in range(len(_xin)):
if not (cparams.xmin is None or cparams.xmin == '' or cparams.xmin == '*') and _xin[i] < cparams.xmin:
continue
if not (cparams.xmax is None or cparams.xmax == '' or cparams.xmax == '*') and cparams.xmax < _xin[i]:
continue
x.append(_xin[i])
y.append(_yin[i])
ndata = len(x)
print(" ndata = ", ndata)
#=============================
# prepare graph
#=============================
print("")
print("plot")
if cparams.mode == 'test':
fig, ax = plt.subplots(3, 3, figsize = cparams.figsize)
ax = ax.flatten()
markersize = 0.3
linewidth = 0.5
fontsize = cparams.fontsize_s
else:
fig, ax = plt.subplots(1, 1, figsize = cparams.figsize)
ax = [ax]
markersize = 1.0
linewidth = 1.0
fontsize = cparams.fontsize
for axis in ax:
axis.tick_params(labelsize = fontsize)
def save_data(method, norder, nsmooth, ndifferential, x, y, ys, xret, yret):
"""
平滑化されたデータをExcelファイルに保存します。
概要:
元のデータ、平滑化されたデータ、およびリサンプリングされたデータをExcel形式で出力します。
詳細説明:
指定された平滑化手法、多項式次数、窓幅、微分次数を含むファイル名で、
入力データ、平滑化されたデータ、リサンプリングされたデータを保存します。
出力ファイルパスは `app.replace_path` を使用して構築されます。
`tkVariousData().to_excel` メソッドを介して、指定されたテンプレートでデータが書き込まれます。
:param method: str: 平滑化手法の名前('simple'または'polynomial')。
:param norder: int: 多項式フィッティングの次数。
:param nsmooth: int: 平滑化に使用する窓幅(ポイント数)。
:param ndifferential: int: 微分の次数。
:param x: list or np.ndarray: オリジナルのX座標データ。
:param y: list or np.ndarray: オリジナルのY座標データ。
:param ys: list or np.ndarray: 平滑化されたY座標データ。
:param xret: list or np.ndarray: リサンプリングされたX座標データ。
:param yret: list or np.ndarray: リサンプリングされたY座標データ。
"""
ext_dict = {"method": method, "norder": norder, "nsmooth": nsmooth, "ndifferential": ndifferential}
if ndifferential == 0:
if method == 'simple':
outfile = app.replace_path(cparams.infile, template = ["{dirname}",
"{filebody}-{method}-{nsmooth}p-order{norder}-smoothed.xlsm"],
ext_dict = ext_dict)
else:
outfile = app.replace_path(cparams.infile, template = ["{dirname}",
"{filebody}-{method}-{nsmooth}p-smoothed.xlsm"],
ext_dict = ext_dict)
else:
if method == 'simple':
outfile = app.replace_path(cparams.infile, template = ["{dirname}",
"{filebody}-{method}-{nsmooth}p-order{norder}-smoothed-{ndifferential}-diff.xlsm"],
ext_dict = ext_dict)
else:
outfile = app.replace_path(cparams.infile, template = ["{dirname}",
"{filebody}-{method}-{nsmooth}p-smoothed-{ndifferential}-diff.xlsm"],
ext_dict = ext_dict)
print("Save calculated data to [{}]".format(outfile))
# df = pd.DataFrame(np.array([x, y, ys]).T, columns = ['x', 'y(input)', 'y(smoothen)'])
# df.to_excel(outfile, index = False)
tkVariousData().to_excel(outfile,
['x', 'y(input)', 'y(smoothen)', 'x(resampled)', 'y(resampled)'],
[x, y, ys, xret, yret], template = cparams.template)
def execute_smooth(idx, method, norder, nsmooth, ndifferential, x, y, cparams):
"""
指定されたパラメータで平滑化処理を実行し、結果をプロットします。
概要:
入力データに対し、指定された平滑化手法、次数、窓幅、微分次数を適用し、
平滑化されたデータを算出してプロットします。
詳細説明:
`method` が 'simple' の場合、シンプルな移動平均を適用します。
`method` が 'polynomial' の場合、`scipy.signal.savgol_filter` を用いて
Savitzky-Golayフィルターを適用します。この際、`norder` と `nsmooth` がチェックされます。
`ndifferential` が1以上の場合、微分処理が実行され、結果が異なる軸 (`twinx`) にプロットされます。
`cparams.do_resampling` が有効な場合、平滑化されたデータは `cparams.sampling_step` に基づいてリサンプリングされます。
処理されたデータは標準出力に表示され、`matplotlib.pyplot.ax` オブジェクトにプロットされます。
:param idx: int: プロットするサブプロットのインデックス。
:param method: str: 平滑化手法の名前('simple'または'polynomial')。
:param norder: int: 多項式フィッティングの次数。
:param nsmooth: int: 平滑化に使用する窓幅(ポイント数)。
:param ndifferential: int: 微分の次数。
:param x: list or np.ndarray: X座標のデータ配列。
:param y: list or np.ndarray: Y座標のデータ配列。
:param cparams: tkParams: アプリケーションのパラメータオブジェクト。
:returns: tuple: オリジナルのX座標、平滑化されたY座標、リサンプリングされたX座標、リサンプリングされたY座標のタプル。
"""
print("")
print(f"Smoothing data #{idx+1} by {method} for order={norder} with {nsmooth} points...")
ys = np.zeros(len(x))
if method == 'simple':
print("Smoothing by simple moving average")
print(f"nsmooth={nsmooth}")
# ys = SmoothingBySimpleAverage(y, nsmooth)
if ndifferential == 0:
# ys = SmoothingBySimpleAverage(y, nsmooth)
print("y=", y)
w = np.ones(nsmooth) / nsmooth
ys = np.convolve(y, w, mode = 'same')
else:
print("")
print("Differentiation is not implemented for simple moving average")
print("Use polynomial smoothing instead")
print("")
return ys
ax[idx].set_title(f"{nsmooth}p simple", fontsize = fontsize)
elif method == 'polynomial':
print("Smoothing by polynomial fit")
print(f"nsmooth={nsmooth} norder={norder}")
if nsmooth <= norder:
print(f"nsmooth={nsmooth} must be larger than norder={norder}. Skip.")
return None
else:
# ys = SmoothingByPolynomialFit(y, nsmooth)
ys = scipy.signal.savgol_filter(y, nsmooth, norder, deriv = ndifferential)
if ndifferential >= 1:
ys /= pow(x[1] - x[0], ndifferential)
ax[idx].set_title(f"{nsmooth}p {norder}-th order polynomial", fontsize = fontsize)
if cparams.do_resampling:
print(f" Resampling...")
#x, ysデータを補間し、xret[0] = x[0]からcparams.sampling_stepごとに、x[-1]を超えない範囲でデータを収集
xret = np.arange(x[0], x[-1] + (x[1] - x[0]) * 0.01, cparams.sampling_step)
yret = np.interp(xret, x, ys)
else:
xret = x
yret = ys
print()
print(f"{'x':10}\t{'y':10}\t{'ys':10}")
for i in range(0, ndata):
print(f"{x[i]:10.4g}\t{y[i]:10.4g}\t{ys[i]:10.4g}")
print()
print(f"{'xret':10}\t{'yret':10}")
for i in range(0, len(xret)):
print(f"{xret[i]:10.4g}\t{yret[i]:10.4g}")
# save_data(method, norder, nsmooth, ndifferential, x, y, ys)
ax[idx].plot(x, y, linestyle = 'none', marker = 'o', markersize = markersize)
if ndifferential == 0:
ax[idx].plot(xret, yret, label = "{}p {}".format(nsmooth, method), linewidth = linewidth)
else:
ax2 = ax[idx].twinx()
ax2.plot(xret, yret, label = "{}p {}".format(nsmooth, method), linewidth = linewidth, color = 'red')
if ndifferential <= 3:
s = ['First', 'Second', 'Third'][ndifferential - 1]
else:
s = f'{ndifferential}-th'
ax2.set_ylabel(f"{s} differential", fontsize = fontsize)
return x, ys, xret, yret
tasks = []
if cparams.mode == 'test':
for i in range(3):
tasks.append(['simple', cparams.norder, cparams.nsmooth + i * cparams.nincrement, cparams.ndifferential])
for i in range(3):
tasks.append(['polynomial', cparams.norder, cparams.nsmooth + i * cparams.nincrement, cparams.ndifferential])
for i in range(3):
tasks.append(['polynomial', cparams.norder + 2, cparams.nsmooth + i * cparams.nincrement, cparams.ndifferential])
for i in range(len(tasks)):
print(f"Smoothing with condition #{i+1}...")
execute_smooth(i, tasks[i][0], tasks[i][1], tasks[i][2], tasks[i][3], x, y, cparams)
else:
tasks = [[cparams.method, cparams.norder, cparams.nsmooth, cparams.ndifferential]]
print(f"Smoothing...")
xs, ys, xret, yret = execute_smooth(0, tasks[0][0], tasks[0][1], tasks[0][2], tasks[0][3], x, y, cparams)
ax[0].set_xlabel(xlabel, fontsize = fontsize)
ax[0].set_ylabel(ylabel, fontsize = fontsize)
save_data(cparams.method, cparams.norder, cparams.nsmooth, cparams.ndifferential, x, y, ys, xret, yret)
plt.tight_layout()
plt.pause(0.1)
app.terminate("Press ENTER to exit>>", usage = usage, pause = True)
exit()
[ドキュメント]
def main(app):
"""
アプリケーションのメイン処理を実行します。
概要:
`cparams.mode` に応じて適切な処理(テストまたはプロット)を呼び出します。
詳細説明:
この関数はプログラムのエントリーポイントの一つとして機能し、
設定されているモードに基づいて `test` 関数を実行します。
'test' または 'plot' モードが指定されている場合は、`test(app)` が呼び出されます。
それ以外の無効なモードが指定された場合は、エラーメッセージを表示してプログラムを終了します。
:param app: tkApplication: tkApplicationのインスタンス。
"""
cparams = app.cparams
if cparams.mode == 'test':
test(app)
elif cparams.mode == 'plot':
test(app)
else:
app.terminate(f"\nError: Invalide mode=[{cparams.mode}]\n", usage = usage, pause = True)
# print("")
# print(f"Error: Invalide mode=[{cparams.mode}]")
# print("")
# exit()
if __name__ == '__main__':
app = tkApplication()
print(f"Initialize parameters")
initialize(app)
print(f"Update parameters by command-line arguments")
update_vars(app)
main(app)