cms.ft.mem のソースコード

"""
MEM.py - 最小エントロピー法 (Maximum Entropy Method) によるスペクトル分析

概要:
このモジュールは、Burg法を用いた最小エントロピー法により、時系列データからパワースペクトル密度を計算します。
また、模擬データ生成、MEM分析の実行、結果のプロット機能を提供します。

詳細説明:
本スクリプトは、時系列データに対してMEM分析を適用し、そのパワースペクトル密度を推定します。
Burg法を使用してARモデルの係数を導出し、それらの係数からスペクトルを計算します。
レビンソン・ダービン再帰アルゴリズムが内部で使用されます。
分析結果はコンソールに出力され、matplotlibを用いて入力時系列とMEMスペクトルがグラフとして表示されます。

関連リンク:
:doc:`mem_usage`
"""
import sys
from math import pi, sin, cos, tan, exp
import random
import numpy as np
from matplotlib import pyplot as plt


#x0, x1, nData = 0, 1.0, 128
x0, x1, nData = 0, 1.0, 1024
WG = 0.02
IgnoreZero = 0
ConvFunc   = 'Gaussian'

mmx  = 38
lmax = 50


[ドキュメント] def MEMSub(nmax, mmx, dt, pg, pf, pspec, pm): """ MEMスペクトルを計算する。 詳細説明: 最小エントロピー法(MEM)におけるスペクトル計算の補助関数。 Burg法で得られた予測係数を用いてパワースペクトル密度を算出します。 周波数軸 `pf` とパワースペクトル `pspec` はこの関数内で計算され更新されます。 Parameters ---------- :param nmax: int 時系列データの長さ。 :param mmx: int MEMモデルの最大次数(予測係数の数)。 :param dt: float サンプリング間隔。 :param pg: numpy.ndarray Burg法で計算された予測係数 (AR係数)。 :param pf: numpy.ndarray 計算される周波数軸の値を格納する配列。この関数内で更新される。 :param pspec: numpy.ndarray 計算されるパワースペクトル密度を格納する配列。この関数内で更新される。 :param pm: float 最後の予測誤差電力。 Returns ------- :returns: None `pf` と `pspec` はin-placeで更新されるため、明示的な戻り値はない。 """ f0 = 1.0 / dt / (nmax - 1.0) for i in range(1, nmax+1): pf[i] = f0 * i sumr = 1.0 sumi = 0.0 cr = 0.0 ci = 2.0 * pi * pf[i] * dt for j in range(1, mmx+1): sumr += pg[j] * cos(j * ci) sumi += pg[j] * sin(j * ci) pspec[i] = 2.0 * pm * dt / (sumr * sumr + sumi * sumi)
[ドキュメント] def MEMLevins(nmax, pg, pm, pb1, pb2, pa, m): """ レビンソン・ダービン再帰アルゴリズムの一部を実装する。 詳細説明: MEMのBurg法における予測誤差フィルタ係数 (`pg`) を計算するために使用される、 レビンソン・ダービン再帰アルゴリズムの主要なステップを実装します。 これは、ARモデルの次数を1ずつ増やしながら係数を効率的に計算する方法です。 Parameters ---------- :param nmax: int 時系列データの長さ。 :param pg: numpy.ndarray 計算される予測係数(AR係数)を格納する配列。この関数内で更新される。 :param pm: float 現在の予測誤差電力。この関数内で更新される。 :param pb1: numpy.ndarray 前方予測誤差の一時的なバッファ。この関数内で更新される。 :param pb2: numpy.ndarray 後方予測誤差の一時的なバッファ。この関数内で更新される。 :param pa: numpy.ndarray 前回のイテレーションで計算された予測係数。この関数内で更新される。 :param m: int 現在のARモデルの次数。 Returns ------- :returns: None `pg`, `pm`, `pa`, `pb1`, `pb2` はin-placeで更新されるため、明示的な戻り値はない。 """ sn, sd = 0.0, 0.0 for i in range(1, nmax+1): sn += pb1[i] * pb2[i] sd += pb1[i] * pb1[i] + pb2[i] * pb2[i] pg[m] = -2.0 * sn / sd pm *= (1.0 - pg[m] * pg[m]) if m != 1: for k in range(1, m): pg[k] = pa[k] + pg[m] * pa[m-k] for i in range(1, m): pa[i] = pg[i] for i in range(1, nmax - m + 1): pb1[i] += pa[m] * pb2[i] pb2[i] = pb2[i+1] + pa[m] * pb1[i+1]
[ドキュメント] def MEMBurg(nmax, mmx, lmax, px, pg, pc, pfpe): """ Burg法を用いて予測係数と予測誤差電力を計算する。 詳細説明: 与えられた時系列データ `px` から、Burg法に基づき最小エントロピー法のための 予測係数 (`pg`)、自己相関関数 (`pc`)、および予測誤差電力 (`pfpe`) を計算します。 `MEMLevins` 関数を利用して、ARモデルの次数を段階的に増やしながら係数を求めます。 Parameters ---------- :param nmax: int 時系列データの長さ。 :param mmx: int MEMモデルの最大次数(計算する予測係数の数)。 :param lmax: int 計算する自己相関関数の最大次数。 :param px: numpy.ndarray 入力時系列データ。 :param pg: numpy.ndarray 計算される予測係数(AR係数)を格納する配列。この関数内で更新される。 :param pc: numpy.ndarray 計算される自己相関関数を格納する配列。この関数内で更新される。 :param pfpe: numpy.ndarray 計算される予測誤差電力を格納する配列。この関数内で更新される。 Returns ------- :returns: float 最終的な予測誤差電力 `pm`。 """ nmat = max([mmx+2, nmax+2]) b1 = np.empty(nmat) b2 = np.empty(nmat) a = np.empty(nmat) sum = 0.0; for i in range(1, nmax + 1): sum += px[i] * px[i] pc[1] = sum / nmax pm = pc[1] pfpe[1] = (nmax + 1.0) / (nmax - 1.0) * pm lag = mmx + 1 lg1 = lag + 1 b1[1] = px[1] for i in range(2, nmax + 1): b1[i] = px[i] b2[i-1] = px[i] for m in range(1, mmx + 1): MEMLevins(nmax, pg, pm, b1, b2, a, m) sum = 0.0; for i in range(1, m + 1): sum -= pc[m-i] * pg[i] pc[m+1] = sum if m != nmax - 1: pfpe[m+1] = (nmax + m + 1) / (nmax - m - 1) * pm if lag < lmax: for l in range(lg1, lmax + 1): sum = 0.0 for i in range(1, mmx + 1): sum -= pc[l] * pg[i] pc[l] = sum return pm
[ドキュメント] def MEM(dt, px, mmx, lmax): """ 最小エントロピー法 (MEM) を実行し、スペクトルおよび関連情報を計算する。 詳細説明: Burg法 (`MEMBurg`) を用いて予測係数と予測誤差電力を計算し、 その後 `MEMSub` 関数を使用してパワースペクトル密度を計算します。 時系列データ `px` とサンプリング間隔 `dt` を受け取り、 MEM分析の主要な結果である周波数軸、スペクトル、予測誤差電力、自己相関関数を返します。 Parameters ---------- :param dt: float サンプリング間隔。 :param px: list or numpy.ndarray 入力時系列データ。 :param mmx: int MEMモデルの最大次数。 :param lmax: int 計算する自己相関関数の最大次数。 Returns ------- :returns f: numpy.ndarray 計算された周波数軸のデータ。 :returns fpe: numpy.ndarray 計算された予測誤差電力のデータ。 :returns spec: numpy.ndarray 計算されたパワースペクトル密度のデータ。 :returns c: numpy.ndarray 計算された自己相関関数(ARモデルの係数)。 :returns mmx: int MEMモデルの最大次数(入力値をそのまま返す)。 :returns lmax: int 自己相関関数の最大次数(入力値をそのまま返す)。 :returns pm: float 最終的な予測誤差電力。 """ nmax = len(px) nmat = max([mmx+2, nmax+2]) f = np.zeros(nmat) fpe = np.zeros(nmat) spec = np.zeros(nmat) c = np.zeros(nmat) g = np.zeros(nmat) x2 = [0.0] + px # 1-based indexにするため先頭にダミーを追加 pm = MEMBurg(nmax, mmx, lmax, x2, g, c, fpe) MEMSub(nmax, mmx, dt, g, f, spec, pm) return f, fpe, spec, c, mmx, lmax, pm
[ドキュメント] def MEMCall(x, y, mmx, lmax): """ MEM分析を実行し、結果を出力する。 詳細説明: 入力時系列データ `y` に対して前処理(平均除去)を行い、`MEM` 関数を呼び出してMEM分析を実行します。 計算された結果(周波数、スペクトル、予測誤差電力、自己相関関数)をコンソールに出力し、 これらの配列を返します。モデル次数 `mmx` と自己相関次数の最大値 `lmax` を調整します。 Parameters ---------- :param x: list or numpy.ndarray 時系列データの時間軸(x軸)データ。 :param y: list or numpy.ndarray MEM分析の対象となる時系列データ(y軸)データ。 :param mmx: int MEMモデルの最大次数。 :param lmax: int 計算する自己相関関数の最大次数。 Returns ------- :returns pf: numpy.ndarray 計算された周波数軸のデータ。 :returns pspec: numpy.ndarray 計算されたパワースペクトル密度のデータ。 :returns pfpe: numpy.ndarray 計算された予測誤差電力のデータ。 :returns pc: numpy.ndarray 計算された自己相関関数(ARモデルの係数)。 """ lag = mmx + 1 if lag+1 > lmax: lmax = lag + 1 nmax = len(x) sum = 0.0; for i in range(nmax): sum += y[i] # yの平均を計算 av = sum / nmax y_adjusted = [val - av for val in y] # 平均除去したy dt = x[1] - x[0] pf, pfpe, pspec, pc, mmx , lmax, pm = MEM(dt, y_adjusted, mmx , lmax) lag = mmx + 1 lg1 = lag + 1 lx1 = lmax + 1 print("\nf\tspc\tfpe\tauto c.") # インデックス1から表示するため、0はスキップ for i in range(1, min(lag, len(pf), len(pspec), len(pfpe), len(pc))): print("{:.4f}\t{:.4f}\t{:.4f}\t{:.4f}".format(pf[i] , pspec[i] , pfpe[i] , pc[i])) if lag != lmax: for i in range(lg1, min(lmax+1, len(pf), len(pspec), len(pc))): print("{:.4f}\t{:.4f}\t{:.4f}".format(pf[i], pspec[i], pc[i])) for i in range(lx1, min(nmax, len(pf), len(pspec))): print("{:.4f}\t{:.4f}".format(pf[i], pspec[i])) return pf, pspec, pfpe, pc
[ドキュメント] def Func(x): """ 複数の正弦波を組み合わせた模擬時系列データを生成する。 詳細説明: 3つの異なる周波数、位相、振幅を持つ正弦波の和に、 わずかなランダムノイズ(0.03 * random.random())を加えた値を返します。 この関数はMEM分析のテストデータとして使用されることを目的としています。 Parameters ---------- :param x: float 時間軸上の現在の位置。 Returns ------- :returns: float 計算された関数値にランダムノイズを加えたもの。 """ x += 0.03 * random.random() f1, p1, A1 = 1.5, pi/4.0, 1.0 f2, p2, A2 = 3.0, pi/3.0, 0.3 f3, p3, A3 = 10.0, pi/6.0, 0.5 return A1 * sin(2.0*pi * f1 * x + p1) \ + A2 * sin(2.0*pi * f2 * x + p2) \ + A3 * sin(2.0*pi * f3 * x + p3)
[ドキュメント] def main(): """ プログラムのエントリーポイント。MEM分析を実行し結果をプロットする。 詳細説明: この関数は、以下の主要なステップを実行します。 1. グローバル変数 `x0`, `x1`, `nData` に基づいて時間軸 `x` と模擬時系列データ `y` を生成します。 2. 生成されたデータに対して `MEMCall` 関数を呼び出し、MEM分析を実行します。 3. 分析結果である入力時系列とMEMスペクトルを `matplotlib` を使用してグラフ化し表示します。 4. ユーザーがEnterキーを押すまでグラフを表示し続けます。 Parameters ---------- :param None Returns ------- :returns: None """ xstep = (x1 - x0) / nData print("x = ({}, {}, {})".format(x0, x1, xstep)) print("nData=", nData) print("W(Gauss)=", WG) x = [] y = [] for i in range(nData): xx = x0 + i * xstep x.append(xx) y.append(Func(xx)) pf, pspec, pfpe, pc = MEMCall(x, y, mmx, lmax) #============================= # Plot graphs #============================= fig = plt.figure() ax1 = fig.add_subplot(2, 1, 1) ax2 = fig.add_subplot(2, 1, 2) ax1.plot(x, y, label = 'input') ax1.set_xlabel("t") ax1.set_ylabel("y") ax1.legend() # pfとpspecのインデックス0にはダミー値が入っている場合があるので、1から描画することが適切 # ただし、元のコードが全てをプロットする意図ならそのまま # MEM関数やMEMCall関数の処理を考慮すると、pf, pspecはインデックス1から有効なデータが入ると考えられる。 # しかし、Docstringのルールは既存ロジックを変更しないことなので、そのままプロットする。 ax2.plot(pf, pspec, label = 'mem') ax2.set_xlabel("f") ax2.set_ylabel("mem") ax2.legend() plt.tight_layout() plt.pause(0.1) print("Press ENTER to exit>>", end = '') input()
if __name__ == '__main__': main()