cms.ft.dft のソースコード

"""
概要: 離散フーリエ変換 (DFT) および高速フーリエ変換 (FFT) のデモンストレーションを行うスクリプト。

詳細説明:
このスクリプトは、乱数でわずかに変調された3つの異なる周波数の正弦波を合成したデータを生成します。
生成されたデータに対して、以下の3種類のフーリエ変換を実行し、そのパフォーマンスと結果を比較します。
1. 回転因子を事前に計算する最適化されたDFT (dft関数)
2. 各ステップで三角関数を計算する基本的なDFT (dft2関数)
3. NumPyライブラリのFFT実装 (fft関数)

各変換の実行時間を計測し、すべての変換結果と元のデータは 'dft.csv' というCSVファイルに出力されます。
コマンドライン引数でデータ点数 (ndata) を指定することができます。

関連リンク:
:doc:`dft_usage`
"""
import numpy as np
from numpy import sin, cos, tan, pi
import csv
import sys
from time import time


x0 = 0.0
x1 = 1.0
ndata = 1024

outfile = 'dft.csv'


if __name__ == "__main__":
    argv = sys.argv
    narg = len(argv)
    if narg >= 2:
        ndata = int(argv[1])


[ドキュメント] def usage(): """ 概要: スクリプトの正しい使用方法を標準出力に表示します。 詳細説明: プログラムの実行時にコマンドライン引数が不適切である場合に呼び出され、 正しい引数の形式と例を示します。 """ print("") print("Usage: python {} ndata".format(argv[0])) print(" ex: python {} {}".format(argv[0], ndata)) print("")
[ドキュメント] def terminate(): """ 概要: 使用方法を表示し、プログラムを終了します。 詳細説明: `usage()` 関数を呼び出してスクリプトの使用方法を標準出力に表示した後、 `exit()` 関数によってプログラムを強制終了させます。 通常、エラーや不適切な引数処理の際に使用されます。 """ usage() exit()
# three trigonometric functions with randomized frequencies # A: amplitude, f: frequency, p1: phase
[ドキュメント] def func(x): """ 概要: ランダムなノイズを含む複数の正弦波を合成した値を計算します。 詳細説明: 入力値 `x` に微小なランダムノイズを加えた後、異なる周波数、位相、振幅を持つ 3つの正弦波の合計値を計算して返します。これにより、シミュレートされた データに現実的な変動を与えます。 :param x: float 入力値 (時間または空間座標)。 :returns: float 複数の正弦波の合成値にノイズを加えたもの。 """ x += 0.03 * np.random.rand(); f1, p1, A1 = ( 1.5, pi/4.0, 1.0); f2, p2, A2 = ( 3.0, pi/3.0, 0.3); f3, p3, A3 = (10.0, pi/6.0, 0.5); return A1 * sin(2.0*pi * f1 * x + p1) \ + A2 * sin(2.0*pi * f2 * x + p2) \ + A3 * sin(2.0*pi * f3 * x + p3)
[ドキュメント] def fft(y): """ 概要: 入力信号の高速フーリエ変換 (FFT) を計算します。 詳細説明: NumPyライブラリに実装されている `np.fft.fft` 関数を使用して、 与えられた実数または複素数の信号 `y` の高速フーリエ変換を実行します。 :param y: numpy.ndarray フーリエ変換を行う入力信号の配列。 :returns: numpy.ndarray 入力信号のFFT結果 (複素数配列)。 """ return np.fft.fft(y)
[ドキュメント] def dft(n, x, y, mode): """ 概要: 離散フーリエ変換 (DFT) を計算します(回転因子を事前計算して最適化)。 詳細説明: この関数は、離散フーリエ変換を実行します。計算を最適化するために、 回転因子 `exp(i * 2 * pi * i * k / n)` の実部と虚部(`cos` と `sin`)を 事前に計算し、ループ内で再利用します。これにより、`dft2` と比較して 三角関数の計算回数を減らし、処理速度を向上させます。 `mode` が 'inverse' の場合は逆DFTを実行し、それ以外の場合は順DFTを実行します。 コードでは`mode`に整数`1`が渡される場合がありますが、その際は'inverse'と一致しないため、順DFTとして処理されます。 :param n: int データ点の総数。 :param x: numpy.ndarray 独立変数の配列 (時間または空間座標)。この関数では主にデータ範囲の計算に使用されます。 :param y: numpy.ndarray フーリエ変換を行う入力信号の配列。各要素は複素数であると仮定されます。 :param mode: Union[str, int] 変換モードを指定します。 'inverse' の場合、逆離散フーリエ変換の回転因子が適用されます。 それ以外の値(例: 'forward', `1` など)の場合、通常の離散フーリエ変換が実行されます。 :returns: tuple `int`: 変換後のデータ点の数 (n/2)。 `list`: 周波数成分の配列。 `list`: DFT結果の実部配列。 `list`: DFT結果の虚部配列。 """ if mode == 'inverse': C = -1.0 else: C = 1.0 f = [0.0]*n yr = [0.0]*n yi = [0.0]*n wr = [0.0]*n wi = [0.0]*n df = 1.0 / (x[n-1] - x[0]); wr[0] = 1.0 wi[0] = 0.0 phi = C * 2.0*pi / n wr0 = cos(phi) wi0 = sin(phi) wr[1] = wr0 wi[1] = wi0 for k in range(2, n): wr[k] = wr[k-1] * wr0 - wi[k-1] * wi0 wi[k] = wr[k-1] * wi0 + wi[k-1] * wr0 for k in range(0, int(n/2)): f[k] = df * (k+1) yr[k] = 0.0 yi[k] = 0.0 for i in range(0, n): ik = (i * k) % n wr0 = wr[ik]; wi0 = wi[ik]; # x*exp(2pik) = (R + i I)(cos(phi) + i sin(phi)) # Rcos(phi) - Isin(phi) + i(Rsin(phi) + Icos(phi)) yr[k] += y[i].real * wr0 - y[i].imag * wi0; yi[k] += y[i].real * wi0 + y[i].imag * wr0; return int(n / 2), f, yr, yi
[ドキュメント] def dft2(n, x, y, mode): """ 概要: 離散フーリエ変換 (DFT) を計算します(各ステップで三角関数を計算)。 詳細説明: この関数は、離散フーリエ変換を実行します。`dft` 関数とは異なり、 各計算ステップで回転因子 `exp(i * 2 * pi * i * k / n)` の実部と虚部 (`cos` と `sin`)を直接計算します。これはより直感的な実装ですが、 三角関数の計算オーバーヘッドが大きいため、通常 `dft` 関数よりも低速です。 `mode` が 'inverse' の場合は逆DFTを実行し、それ以外の場合は順DFTを実行します。 コードでは`mode`に整数`1`が渡される場合がありますが、その際は'inverse'と一致しないため、順DFTとして処理されます。 :param n: int データ点の総数。 :param x: numpy.ndarray 独立変数の配列 (時間または空間座標)。この関数では主にデータ範囲の計算に使用されます。 :param y: numpy.ndarray フーリエ変換を行う入力信号の配列。各要素は複素数であると仮定されます。 :param mode: Union[str, int] 変換モードを指定します。 'inverse' の場合、逆離散フーリエ変換の回転因子が適用されます。 それ以外の値(例: 'forward', `1` など)の場合、通常の離散フーリエ変換が実行されます。 :returns: tuple `int`: 変換後のデータ点の数 (n/2)。 `list`: 周波数成分の配列。 `list`: DFT結果の実部配列。 `list`: DFT結果の虚部配列。 """ if mode == 'inverse': C = -1.0 else: C = 1.0 f = [0.0]*n yr = [0.0]*n yi = [0.0]*n df = 1.0 / (x[n-1] - x[0]); for k in range(0, int(n/2)): f[k] = df * (k+1) yr[k] = 0.0 yi[k] = 0.0 for i in range(0, n): phi = C * 2.0*pi / n * i*k; cosp = cos(phi); sinp = sin(phi); # x*exp(2pik) = (R + i I)(cos(phi) + i sin(phi)) # Rcos(phi) - Isin(phi) + i(Rsin(phi) + Icos(phi)) yr[k] += y[i].real * cosp - y[i].imag * sinp; yi[k] += y[i].real * sinp + y[i].imag * cosp; return int(n / 2), f, yr, yi
[ドキュメント] def main(): """ 概要: スクリプトのメイン処理を実行し、データの生成、DFT/FFTの計算、および結果のファイル出力を行います。 詳細説明: この関数は以下の主要なステップを実行します。 1. グローバル変数 `ndata` に基づいて、`x0` から `x1` の範囲でデータを生成します。 各データ点 `x[i]` に対して `func(x[i])` を呼び出し、乱数で変調された合成波 `y[i]` を作成します。 2. 生成されたデータに対して、`dft` (回転因子最適化DFT)、`dft2` (基本的なDFT)、 および `fft` (NumPyによるFFT) の3種類のフーリエ変換を実行します。 3. 各変換の実行時間を計測し、その結果を標準出力に表示します。 `fft` は複数回実行され、平均時間が算出されます。 4. 元のデータ `x`, `y` と、各変換の実部および虚部の結果を `outfile` で指定された CSVファイル (`dft.csv`) に書き出します。 5. 最後に `terminate()` 関数を呼び出し、プログラムを終了します。 """ global x0, x1, ndata global csvfile global outfile x = np.zeros(ndata) y = np.zeros(ndata) print("Make data by func(x) with random scattering") xstep = (x1 - x0) / ndata print("x = ({}, {}, {})".format(x0, x1, xstep)) print("ndata={}".format(ndata)) for i in range(0, ndata): x[i] = x0 + i * xstep y[i] = func(x[i]) ndata = len(x) starttime = time() nDFT, fDFT, DFTr, DFTi = dft(ndata, x, y, 1) endtime = time() print("Perform DFT (using rotation factor, faster): {} sec".format(endtime - starttime)) print("") starttime = time() nDFT2, fDFT2, DFT2r, DFT2i = dft2(ndata, x, y, 1) endtime = time() print("Perform DFT2 (calculate sin/cos everytime, slow): {} sec".format(endtime - starttime)) print("") nFFTiter = 10000 starttime = time() for i in range(nFFTiter): FFT = fft(y) endtime = time() print("Perform FFT(fast): {} sec".format((endtime - starttime) / nFFTiter)) print("") print("Save FFTed data to [{}]".format(outfile)) f = open(outfile, 'w') fout = csv.writer(f, lineterminator='\n') fout.writerow(['x', 'y', 'FT(DFT).r', 'FT(DFT).i', 'FT(DFT2).r', 'FT(DFT2).i', 'FT(FFT).r', 'FT(FFT).i']) for i in range(0, int(ndata/2)): fout.writerow([x[i], y[i], DFTr[i], DFTi[i], DFT2r[i], DFT2i[i], FFT[i].real, FFT[i].imag]) terminate()
if __name__ == "__main__": main()