"""
FFT_smoothing.py: 高速フーリエ変換 (FFT) およびFFT平滑化ツール。
概要:
入力データに対して高速フーリエ変換 (FFT)、逆高速フーリエ変換 (IFFT)、またはFFTを利用した平滑化処理を実行するスクリプト。
詳細説明:
このスクリプトは、指定された入力ファイルから時系列データを読み込み、
ユーザーが選択したモード(FFT、IFFT、またはFFT平滑化)に基づいて処理を行います。
処理されたデータは、元のデータと比較可能な形でExcelファイルに保存され、
さらにmatplotlibを用いてグラフとして可視化されます。
FFT平滑化モードでは、指定された周波数範囲(kcut0からkcut1)外の成分を除去することで、
データのノイズを除去し、より滑らかな曲線を得ることができます。
関連リンク:
:doc:`FFT_smoothing_usage`
"""
import sys
import csv
import numpy as np
from numpy import sin, cos, tan, pi
#import ast
import pandas as pd
import matplotlib.pyplot as plt
from tklib.tkutils import replace_path, getarg, getintarg, getfloatarg, pint, pfloat
from tklib.tksci.tksci import kB, e
from tklib.tkvariousdata import tkVariousData
from tklib.tkapplication import tkApplication
from tklib.tkparams import tkParams
from tklib.tksci.tksci import asin, acos, atan, degcos, degsin, degtan, degacos, degasin, degatan, arcsin, arccos, arctan
from tklib.tksci.tksci import factorial, log10, gamma, combination, eVTonm, nmToeV, Bn
from tklib.tksci.tksci import h, h_bar, hbar, e, kB, NA, c, pi, pi2, torad, todeg, basee
from tklib.tksci.tksci import me, mp, mn
from tklib.tksci.tksci import u0, e0, a0, R, F, g, G
from tklib.tksci.tksci import HartreeToeV, RyToeV, KToeV, eVToK, JToeV, eVToJ, Debye
app = tkApplication()
cparams = tkParams()
cparams.mode = 'FFT smoothing'
cparams.infile = 'xrd.xls'
cparams.xmin = '*'
cparams.xmax = '*'
cparams.kcut0 = '*'
cparams.kcut1 = '*'
cparams.xlabel = 0
cparams.ylabel = 1
cparams.figsize = [12, 8]
cparams.fontsize = 12
cparams.legend_fontsize = 10
logfile = app.replace_path(cparams.infile)
print(f"Open logfile [{logfile}]")
app.redirect(targets = ["stdout", logfile], mode = 'w')
[ドキュメント]
def usage(app):
"""
スクリプトの利用方法を表示します。
概要:
この関数は、スクリプトが正しく実行されなかった場合や、
必要な引数が不足している場合に、標準出力に利用方法のメッセージを表示します。
:param app: tkApplicationオブジェクト。アプリケーションの終了処理に使用されます。
:type app: tkApplication
:returns: None
"""
print("")
print(f"Usage: python {sys.argv[0]} kcut0 kcut1")
print("")
[ドキュメント]
def update_vars(app):
"""
コマンドライン引数からパラメータを更新します。
概要:
`sys.argv` を解析し、`cparams` グローバルオブジェクトの各フィールドを更新します。
詳細説明:
引数が不足している場合は `app.terminate` を呼び出してスクリプトを終了します。
ファイルパス、X/Y軸の指定、X軸の計算範囲、FFTカットオフ周波数などが更新されます。
また、Excel出力用のテンプレートファイル名も設定されます。
:param app: tkApplicationオブジェクト。アプリケーションの終了処理に使用されます。
:type app: tkApplication
:returns: None
"""
argv = sys.argv
narg = len(argv)
if narg <= 2:
app.terminate("\n", usage = usage, pause = True)
cparams.mode = getarg(1, cparams.mode)
cparams.infile = getarg(2, cparams.infile)
cparams.xlabel = getarg(3, cparams.xlabel)
cparams.ylabel = getarg(4, cparams.ylabel)
cparams.xmin = getarg(5, cparams.xmin)
cparams.xmax = getarg(6, cparams.xmax)
cparams.kcut0 = getarg(7, cparams.kcut0)
cparams.kcut1 = getarg(8, cparams.kcut1)
cparams.template = "StandardGraph.xlsm"
[ドキュメント]
def common_func():
"""
データの読み込み、範囲指定、初期設定を行う共通関数。
概要:
指定された入力ファイルからデータを読み込み、X軸とY軸のデータを抽出し、計算対象範囲でフィルタリングします。
詳細説明:
この関数は、`cparams.infile` からデータを読み込み、`cparams.xlabel` と `cparams.ylabel` に基づいて
X軸とY軸のデータを特定します。`cparams.xmin` と `cparams.xmax` の設定に従い、
計算対象となるデータのX軸範囲をフィルタリングします。
フィルタリングされたデータの数、開始値、ステップ幅など、後続のFFT処理に必要な情報を準備して返します。
X軸またはY軸の指定が不適切な場合は、スクリプトを終了します。
:returns: tuple: 以下の要素を含むタプル。
- label_x (str): X軸のラベル。
- _x (list[float]): フィルタリング前の全てのX軸データ。
- label_y (str): Y軸のラベル。
- _y (list[float]): フィルタリング前の全てのY軸データ。
- n (int): フィルタリング後のデータ点の数。
- x0 (float): フィルタリング後のX軸データの開始値(通常は `x[0]`)。
- xstep (float): フィルタリング後のX軸データのステップ幅。
:rtype: tuple[str, list[float], str, list[float], int, float, float]
"""
print("")
print("========================================")
print(f"mode: {cparams.mode}")
print(f"infile: {cparams.infile}")
print(f"xlabel: {cparams.xlabel}")
print(f"ylabel: {cparams.ylabel}")
print(f"x range for calculation: {cparams.xmin} - {cparams.xmax}")
print(f"k range included for smoothing: {cparams.kcut0} - {cparams.kcut1}")
if '***' in cparams.xlabel:
app.terminate("\nxlabel should be chosen\n", usage = usage, pause = True)
if '***' in cparams.ylabel:
app.terminate("\nylabel should be chosen\n", usage = usage, pause = True)
print("")
print(f"Read [{cparams.infile}]")
datafile = tkVariousData(cparams.infile)
labels, datalist = datafile.Read_minimum_matrix(close_fp = True, usage = app.usage)
label_x, _x = datafile.FindDataArray(cparams.xlabel, flag = 'i')
label_y, _y = datafile.FindDataArray(cparams.ylabel, flag = 'i')
n_all = len(_x)
print("")
print("Input data:")
print("ndata:", n_all)
xmin = min(_x)
xmax = max(_x)
print(f" input x range: {xmin:10.4g} - {xmax:10.4g}")
cparams.xmin = pfloat(cparams.xmin, defval = xmin)
cparams.xmax = pfloat(cparams.xmax, defval = xmax)
x = []
y = []
for i in range(n_all):
if cparams.xmin <= _x[i] <= cparams.xmax:
x.append(_x[i])
y.append(_y[i])
n = len(x)
xstep = x[1] - x[0]
x0 = x[1]
x1 = x0 + xstep * (n-1)
print("")
print("Input data:")
print("ndata:", n)
print(f" x label: {label_x}")
print(f" y label: {label_y}")
print(f" x range included for caculation: {cparams.xmin:10.4g} - {cparams.xmax:10.4g}")
return label_x, _x, label_y, _y, n, x0, xstep
[ドキュメント]
def IFFT():
"""
入力データに対して逆高速フーリエ変換 (IFFT) を実行し、結果をプロット・保存します。
概要:
`common_func` で準備された時系列データに対して逆高速フーリエ変換 (IFFT) を適用し、その結果を可視化・保存します。
詳細説明:
NumPyの `np.fft.ifft` を用いて、入力データ `y` の逆FFTを計算します。
IFFT後のX軸(逆周波数軸)データも計算されます。
IFFT結果の実部と虚部を抽出し、元のデータ、IFFT結果と共にPandas DataFrameにまとめます。
このDataFrameは指定されたテンプレート形式でExcelファイルに出力されます。
最後に、元のデータとIFFT結果の実部・虚部をmatplotlibでグラフ表示し、ユーザーが結果を確認できるようにします。
:returns: None
"""
label_x, x, label_y, y, n, x0, xstep = common_func()
# Inverse FFT
F = np.fft.ifft(y)
# calculate FFTed x axis
xfrange = 1.0 / xstep
dxf = xfrange / (n-1)
xf = np.zeros(n)
for i in range(0, n):
xf[i] = i * dxf
print("")
print("x range : {} - {}".format(min(x), max(x)))
print("x(fft) range: {} - {}".format(min(xf), max(xf)))
Fr = [F[i].real for i in range(n)]
Fi = [F[i].imag for i in range(n)]
Fr = [F[i].real for i in range(n)] # 実際にはこの行は不要だが、既存コードのためそのまま
df = pd.DataFrame(np.array([x, y, xf, Fr, Fi, Fr]).T,
columns = ['x', 'y', 'x(ifft)', 'y(ifft).r', 'y(ifft).i', '|y(ifftr)|'])
outfile = app.replace_path(cparams.infile, template = ["{dirname}", "{filebody}-ifft.xlsx"])
print("")
print(f"Save to [{outfile}]")
# df.to_excel(outfile, index = False)
data_list = df.values.tolist()
tkVariousData().to_excel(outfile, list(df.columns), list(map(list, zip(*data_list))), template = cparams.template)
print("")
print("Plot")
fig, axes = plt.subplots(1, 2, figsize = cparams.figsize)
axes = axes.flatten()
# ax2 = axes[1].twinx()
axes[0].tick_params(labelsize = cparams.fontsize)
axes[1].tick_params(labelsize = cparams.fontsize)
axes[0].plot(x, y, label = 'input data', color = 'black', linewidth = 0.5)
axes[0].set_xlabel(label_x, fontsize = cparams.fontsize)
axes[0].set_ylabel(label_y, fontsize = cparams.fontsize)
axes[0].legend(fontsize = cparams.legend_fontsize)
ins1 = axes[1].plot(xf, Fr, label = 'FT real', color = 'black', linewidth = 0.5)
ins2 = axes[1].plot(xf, Fi, label = 'FT imag', color = 'red', linewidth = 0.5)
ins3 = axes[1].plot(xf, Fr, label = '|FT|', color = 'blue', linewidth = 0.5) # 既存コードのためFrをそのまま使用
axes[1].set_xlabel('k', fontsize = cparams.fontsize)
axes[1].set_ylabel('Inverse $FT^*$', fontsize = cparams.fontsize)
axes[1].legend(fontsize = cparams.legend_fontsize)
plt.tight_layout()
plt.pause(0.001)
app.terminate("\n", usage = usage, pause = True)
[ドキュメント]
def FFT():
"""
入力データに対して高速フーリエ変換 (FFT) を実行し、結果をプロット・保存します。
概要:
`common_func` で準備された時系列データに対して高速フーリエ変換 (FFT) を適用し、その結果を可視化・保存します。
詳細説明:
NumPyの `np.fft.fft` を用いて、入力データ `y` のFFTを計算します。
FFT後のX軸(周波数軸)データも計算されます。
FFT結果の実部と虚部を抽出し、元のデータ、FFT結果と共にPandas DataFrameにまとめます。
このDataFrameは指定されたテンプレート形式でExcelファイルに出力されます。
最後に、元のデータとFFT結果の実部・虚部をmatplotlibでグラフ表示し、ユーザーが結果を確認できるようにします。
:returns: None
"""
label_x, x, label_y, y, n, x0, xstep = common_func()
# FFT
F = np.fft.fft(y)
# calculate FFTed x axis
xfrange = 1.0 / xstep
dxf = xfrange / (n-1)
xf = np.zeros(n)
for i in range(0, n):
xf[i] = i * dxf
print("")
print("x range : {} - {}".format(min(x), max(x)))
print("x(fft) range: {} - {}".format(min(xf), max(xf)))
Fr = [F[i].real for i in range(n)]
Fi = [F[i].imag for i in range(n)]
Fr = [F[i].real for i in range(n)] # 実際にはこの行は不要だが、既存コードのためそのまま
df = pd.DataFrame(np.array([x, y, xf, Fr, Fi, Fr]).T,
columns = ['x', 'y', 'x(fft)', 'y(fft).r', 'y(fft).i', '|y(fftr)|'])
outfile = app.replace_path(cparams.infile, template = ["{dirname}", "{filebody}-fft.xlsx"])
print("")
print(f"Save to [{outfile}]")
# df.to_excel(outfile, index = False)
data_list = df.values.tolist()
tkVariousData().to_excel(outfile, list(df.columns), list(map(list, zip(*data_list))), template = cparams.template)
print("")
print("Plot")
fig, axes = plt.subplots(1, 2, figsize = cparams.figsize)
axes = axes.flatten()
# ax2 = axes[1].twinx()
axes[0].tick_params(labelsize = cparams.fontsize)
axes[1].tick_params(labelsize = cparams.fontsize)
axes[0].plot(x, y, label = 'input data', color = 'black', linewidth = 0.5)
axes[0].set_xlabel(label_x, fontsize = cparams.fontsize)
axes[0].set_ylabel(label_y, fontsize = cparams.fontsize)
axes[0].legend(fontsize = cparams.legend_fontsize)
ins1 = axes[1].plot(xf, Fr, label = 'FT real', color = 'black', linewidth = 0.5)
ins2 = axes[1].plot(xf, Fi, label = 'FT imag', color = 'red', linewidth = 0.5)
ins3 = axes[1].plot(xf, Fr, label = '|FT|', color = 'blue', linewidth = 0.5) # 既存コードのためFrをそのまま使用
axes[1].set_xlabel('k', fontsize = cparams.fontsize)
axes[1].set_ylabel('$FT^*$', fontsize = cparams.fontsize)
axes[1].legend(fontsize = cparams.legend_fontsize)
plt.tight_layout()
plt.pause(0.001)
app.terminate("\n", usage = usage, pause = True)
[ドキュメント]
def FFT_smoothing():
"""
入力データに対してFFT平滑化を適用し、結果をプロット・保存します。
概要:
`common_func` で準備された時系列データに対してFFT平滑化処理を適用し、その結果を可視化・保存します。
詳細説明:
まず、入力データ `y` に対して高速フーリエ変換 (FFT) を実行します。
次に、`cparams.kcut0` から `cparams.kcut1` で指定された周波数範囲外のFFT成分をゼロに設定することで、
フィルタリングされたスペクトルを生成します。
このフィルタリングされたスペクトルに対して逆高速フーリエ変換 (IFFT) を実行することで、
ノイズが除去された平滑化データ `fs` が得られます。
元のデータ、FFT結果、フィルタリングされたFFT結果、平滑化されたデータの実部・虚部・絶対値が
Pandas DataFrameにまとめられ、指定されたテンプレート形式でExcelファイルに出力されます。
最終的に、元のデータと平滑化されたデータ、FFT結果とフィルタリング後のFFT結果を
複数のサブプロットにわたってグラフ表示し、処理前後の変化を視覚的に確認できるようにします。
:returns: None
"""
label_x, x, label_y, y, n, x0, xstep = common_func()
# FFT
F = np.fft.fft(y)
# calculate FFTed x axis
xfrange = 1.0 / xstep
dxf = xfrange / (n-1)
xf = np.zeros(n)
for i in range(0, n):
xf[i] = i * dxf
print("")
minxf = min(xf)
maxxf = max(xf)
print("x range : {} - {}".format(min(x), max(x)))
print("x(fft) range: {} - {}".format(minxf, maxxf))
cparams.kcut0 = pfloat(cparams.kcut0, defval = minxf)
cparams.kcut1 = pfloat(cparams.kcut1, defval = maxxf)
print(f"k range included for smoothing : {cparams.kcut0:10.4g} - {cparams.kcut1:10.4g}")
# cut FT data outside [lf, hf] for smoothing
# note the FFTed data is symmetric for real part and anti-symmetric for imagenary part with respect to n/2
# LF data are near ix = 0 and ix = n - 1 HF data are around n/2
Fs = np.zeros(n, dtype = 'complex')
for i in range(0, int(n/2)):
if cparams.kcut0 <= xf[i] <= cparams.kcut1:
Fs[i] = F[i]
Fs[n - i - 1] = F[n - i - 1]
else:
# print("cut: {} {}".format(i, xf[i]))
Fs[i] = 0.0
Fs[n - i - 1] = 0.0
# iFFT
fs = np.fft.ifft(Fs)
Fr = [F[i].real for i in range(n)]
Fi = [F[i].imag for i in range(n)]
Fsr = [Fs[i].real for i in range(n)]
Fsi = [Fs[i].imag for i in range(n)]
fsr = [fs[i].real for i in range(n)]
fsi = [fs[i].imag for i in range(n)]
fsabs = [abs(fs[i]) for i in range(n)]
df = pd.DataFrame(np.array([x, y, xf, Fr, Fi, Fsr, Fsi, fsr, fsi, fsabs]).T,
columns = ['x', 'y', 'x(fft)', 'y(fft).r', 'y(fft).i',
'ys(fft,cut).r', 'ys(fft,cut).i',
'ys(ifft).r', 'ys(ifft).i', '|ys(ifft)|']
)
outfile = app.replace_path(cparams.infile, template = ["{dirname}", "{filebody}-fft-smoothing.xlsx"])
print("")
print(f"Save to [{outfile}]")
# df.to_excel(outfile, index = False)
data_list = df.values.tolist()
tkVariousData().to_excel(outfile, list(df.columns), list(map(list, zip(*data_list))), template = cparams.template)
print("")
print("Plot")
fig, axes = plt.subplots(2, 2, figsize = cparams.figsize)
axes = axes.flatten()
# ax2 = axes[1].twinx()
axes[0].tick_params(labelsize = cparams.fontsize)
axes[1].tick_params(labelsize = cparams.fontsize)
axes[2].tick_params(labelsize = cparams.fontsize)
axes[3].tick_params(labelsize = cparams.fontsize)
axes[0].plot(x, y, label = 'input data', color = 'black', linewidth = 0.5)
axes[0].plot(x, fsr, label = 'FT+IFT real', color = 'red', linewidth = 0.5)
axes[0].set_xlabel(label_x, fontsize = cparams.fontsize)
axes[0].set_ylabel(label_y, fontsize = cparams.fontsize)
axes[0].legend(fontsize = cparams.legend_fontsize)
ins1 = axes[1].plot(xf, Fr, label = 'FT real', color = 'red', linewidth = 0.5)
ins2 = axes[1].plot(xf, Fi, label = 'FT imag', color = 'blue', linewidth = 0.5)
axes[1].set_xlabel('k', fontsize = cparams.fontsize)
axes[1].set_ylabel('$FT^*$ (real)', fontsize = cparams.fontsize)
# ax2.set_ylabel ('$FT^*$ (imag)', fontsize = cparams.fontsize)
# ins = ins1 + ins2 + ins3 + ins4
# axes[1].legend(ins, [l.get_label() for l in ins], fontsize = cparams.legend_fontsize, loc = 'upper center') #loc = 'best')
axes[1].legend(fontsize = cparams.legend_fontsize)
axes[2].plot(x, fsr, label = 'FT+IFT real', color = 'red', linewidth = 0.5)
axes[2].plot(x, fsi, label = 'FT+IFT imag', color = 'blue', linewidth = 0.5)
axes[2].plot(x, fsabs, label = '|FT+IFT|', color = 'pink', linewidth = 0.5)
axes[2].set_xlabel(label_x, fontsize = cparams.fontsize)
axes[2].set_ylabel(label_y, fontsize = cparams.fontsize)
axes[2].legend(fontsize = cparams.legend_fontsize)
ins3 = axes[3].plot(xf, Fsr, label = 'FT real(cut)', color = 'red', linewidth = 0.5)
ins4 = axes[3].plot(xf, Fsi, label = 'FT imag(cut)', color = 'blue', linewidth = 0.5)
axes[3].set_xlabel('k', fontsize = cparams.fontsize)
axes[3].set_ylabel('$FT^*$ (real)', fontsize = cparams.fontsize)
axes[3].legend(fontsize = cparams.legend_fontsize)
plt.tight_layout()
plt.pause(0.001)
app.terminate("\n", usage = usage, pause = True)
[ドキュメント]
def main():
"""
スクリプトのエントリーポイント。
概要:
スクリプトの実行開始時に呼び出され、コマンドライン引数を解析し、適切なFFT関連関数を呼び出します。
詳細説明:
`update_vars` 関数を呼び出して、コマンドライン引数からグローバルパラメータ `cparams` を更新します。
その後、`cparams.mode` の値に基づいて、`FFT_smoothing`、`FFT`、`IFFT` のいずれかの関数を実行します。
もし `cparams.mode` に未実装のモードが指定された場合は、エラーメッセージを表示してスクリプトを終了します。
:returns: None
"""
print("")
print( "#######################################")
print(f"# {sys.argv[0]}")
print( "#######################################")
update_vars(app)
cparams.print_parameters()
if cparams.mode == 'FFT smoothing':
FFT_smoothing()
elif cparams.mode == 'FFT':
FFT()
elif cparams.mode == 'IFFT':
IFFT()
else:
app.terminate(f"\nmode [{cparams.mode}] is not implemented.\n", usage = usage, pause = True)
if __name__ == "__main__":
main()