"""
MEM.py - 最小エントロピー法 (Maximum Entropy Method) によるスペクトル分析
概要:
このモジュールは、Burg法を用いた最小エントロピー法により、時系列データからパワースペクトル密度を計算します。
また、模擬データ生成、MEM分析の実行、結果のプロット機能を提供します。
詳細説明:
本スクリプトは、時系列データに対してMEM分析を適用し、そのパワースペクトル密度を推定します。
Burg法を使用してARモデルの係数を導出し、それらの係数からスペクトルを計算します。
レビンソン・ダービン再帰アルゴリズムが内部で使用されます。
分析結果はコンソールに出力され、matplotlibを用いて入力時系列とMEMスペクトルがグラフとして表示されます。
関連リンク:
:doc:`mem_usage`
"""
import sys
from math import pi, sin, cos, tan, exp
import random
import numpy as np
from matplotlib import pyplot as plt
#x0, x1, nData = 0, 1.0, 128
x0, x1, nData = 0, 1.0, 1024
WG = 0.02
IgnoreZero = 0
ConvFunc = 'Gaussian'
mmx = 38
lmax = 50
[ドキュメント]
def MEMSub(nmax, mmx, dt, pg, pf, pspec, pm):
"""
MEMスペクトルを計算する。
詳細説明:
最小エントロピー法(MEM)におけるスペクトル計算の補助関数。
Burg法で得られた予測係数を用いてパワースペクトル密度を算出します。
周波数軸 `pf` とパワースペクトル `pspec` はこの関数内で計算され更新されます。
Parameters
----------
:param nmax: int
時系列データの長さ。
:param mmx: int
MEMモデルの最大次数(予測係数の数)。
:param dt: float
サンプリング間隔。
:param pg: numpy.ndarray
Burg法で計算された予測係数 (AR係数)。
:param pf: numpy.ndarray
計算される周波数軸の値を格納する配列。この関数内で更新される。
:param pspec: numpy.ndarray
計算されるパワースペクトル密度を格納する配列。この関数内で更新される。
:param pm: float
最後の予測誤差電力。
Returns
-------
:returns: None
`pf` と `pspec` はin-placeで更新されるため、明示的な戻り値はない。
"""
f0 = 1.0 / dt / (nmax - 1.0)
for i in range(1, nmax+1):
pf[i] = f0 * i
sumr = 1.0
sumi = 0.0
cr = 0.0
ci = 2.0 * pi * pf[i] * dt
for j in range(1, mmx+1):
sumr += pg[j] * cos(j * ci)
sumi += pg[j] * sin(j * ci)
pspec[i] = 2.0 * pm * dt / (sumr * sumr + sumi * sumi)
[ドキュメント]
def MEMLevins(nmax, pg, pm, pb1, pb2, pa, m):
"""
レビンソン・ダービン再帰アルゴリズムの一部を実装する。
詳細説明:
MEMのBurg法における予測誤差フィルタ係数 (`pg`) を計算するために使用される、
レビンソン・ダービン再帰アルゴリズムの主要なステップを実装します。
これは、ARモデルの次数を1ずつ増やしながら係数を効率的に計算する方法です。
Parameters
----------
:param nmax: int
時系列データの長さ。
:param pg: numpy.ndarray
計算される予測係数(AR係数)を格納する配列。この関数内で更新される。
:param pm: float
現在の予測誤差電力。この関数内で更新される。
:param pb1: numpy.ndarray
前方予測誤差の一時的なバッファ。この関数内で更新される。
:param pb2: numpy.ndarray
後方予測誤差の一時的なバッファ。この関数内で更新される。
:param pa: numpy.ndarray
前回のイテレーションで計算された予測係数。この関数内で更新される。
:param m: int
現在のARモデルの次数。
Returns
-------
:returns: None
`pg`, `pm`, `pa`, `pb1`, `pb2` はin-placeで更新されるため、明示的な戻り値はない。
"""
sn, sd = 0.0, 0.0
for i in range(1, nmax+1):
sn += pb1[i] * pb2[i]
sd += pb1[i] * pb1[i] + pb2[i] * pb2[i]
pg[m] = -2.0 * sn / sd
pm *= (1.0 - pg[m] * pg[m])
if m != 1:
for k in range(1, m):
pg[k] = pa[k] + pg[m] * pa[m-k]
for i in range(1, m):
pa[i] = pg[i]
for i in range(1, nmax - m + 1):
pb1[i] += pa[m] * pb2[i]
pb2[i] = pb2[i+1] + pa[m] * pb1[i+1]
[ドキュメント]
def MEMBurg(nmax, mmx, lmax, px, pg, pc, pfpe):
"""
Burg法を用いて予測係数と予測誤差電力を計算する。
詳細説明:
与えられた時系列データ `px` から、Burg法に基づき最小エントロピー法のための
予測係数 (`pg`)、自己相関関数 (`pc`)、および予測誤差電力 (`pfpe`) を計算します。
`MEMLevins` 関数を利用して、ARモデルの次数を段階的に増やしながら係数を求めます。
Parameters
----------
:param nmax: int
時系列データの長さ。
:param mmx: int
MEMモデルの最大次数(計算する予測係数の数)。
:param lmax: int
計算する自己相関関数の最大次数。
:param px: numpy.ndarray
入力時系列データ。
:param pg: numpy.ndarray
計算される予測係数(AR係数)を格納する配列。この関数内で更新される。
:param pc: numpy.ndarray
計算される自己相関関数を格納する配列。この関数内で更新される。
:param pfpe: numpy.ndarray
計算される予測誤差電力を格納する配列。この関数内で更新される。
Returns
-------
:returns: float
最終的な予測誤差電力 `pm`。
"""
nmat = max([mmx+2, nmax+2])
b1 = np.empty(nmat)
b2 = np.empty(nmat)
a = np.empty(nmat)
sum = 0.0;
for i in range(1, nmax + 1):
sum += px[i] * px[i]
pc[1] = sum / nmax
pm = pc[1]
pfpe[1] = (nmax + 1.0) / (nmax - 1.0) * pm
lag = mmx + 1
lg1 = lag + 1
b1[1] = px[1]
for i in range(2, nmax + 1):
b1[i] = px[i]
b2[i-1] = px[i]
for m in range(1, mmx + 1):
MEMLevins(nmax, pg, pm, b1, b2, a, m)
sum = 0.0;
for i in range(1, m + 1):
sum -= pc[m-i] * pg[i]
pc[m+1] = sum
if m != nmax - 1:
pfpe[m+1] = (nmax + m + 1) / (nmax - m - 1) * pm
if lag < lmax:
for l in range(lg1, lmax + 1):
sum = 0.0
for i in range(1, mmx + 1):
sum -= pc[l] * pg[i]
pc[l] = sum
return pm
[ドキュメント]
def MEM(dt, px, mmx, lmax):
"""
最小エントロピー法 (MEM) を実行し、スペクトルおよび関連情報を計算する。
詳細説明:
Burg法 (`MEMBurg`) を用いて予測係数と予測誤差電力を計算し、
その後 `MEMSub` 関数を使用してパワースペクトル密度を計算します。
時系列データ `px` とサンプリング間隔 `dt` を受け取り、
MEM分析の主要な結果である周波数軸、スペクトル、予測誤差電力、自己相関関数を返します。
Parameters
----------
:param dt: float
サンプリング間隔。
:param px: list or numpy.ndarray
入力時系列データ。
:param mmx: int
MEMモデルの最大次数。
:param lmax: int
計算する自己相関関数の最大次数。
Returns
-------
:returns f: numpy.ndarray
計算された周波数軸のデータ。
:returns fpe: numpy.ndarray
計算された予測誤差電力のデータ。
:returns spec: numpy.ndarray
計算されたパワースペクトル密度のデータ。
:returns c: numpy.ndarray
計算された自己相関関数(ARモデルの係数)。
:returns mmx: int
MEMモデルの最大次数(入力値をそのまま返す)。
:returns lmax: int
自己相関関数の最大次数(入力値をそのまま返す)。
:returns pm: float
最終的な予測誤差電力。
"""
nmax = len(px)
nmat = max([mmx+2, nmax+2])
f = np.zeros(nmat)
fpe = np.zeros(nmat)
spec = np.zeros(nmat)
c = np.zeros(nmat)
g = np.zeros(nmat)
x2 = [0.0] + px # 1-based indexにするため先頭にダミーを追加
pm = MEMBurg(nmax, mmx, lmax, x2, g, c, fpe)
MEMSub(nmax, mmx, dt, g, f, spec, pm)
return f, fpe, spec, c, mmx, lmax, pm
[ドキュメント]
def MEMCall(x, y, mmx, lmax):
"""
MEM分析を実行し、結果を出力する。
詳細説明:
入力時系列データ `y` に対して前処理(平均除去)を行い、`MEM` 関数を呼び出してMEM分析を実行します。
計算された結果(周波数、スペクトル、予測誤差電力、自己相関関数)をコンソールに出力し、
これらの配列を返します。モデル次数 `mmx` と自己相関次数の最大値 `lmax` を調整します。
Parameters
----------
:param x: list or numpy.ndarray
時系列データの時間軸(x軸)データ。
:param y: list or numpy.ndarray
MEM分析の対象となる時系列データ(y軸)データ。
:param mmx: int
MEMモデルの最大次数。
:param lmax: int
計算する自己相関関数の最大次数。
Returns
-------
:returns pf: numpy.ndarray
計算された周波数軸のデータ。
:returns pspec: numpy.ndarray
計算されたパワースペクトル密度のデータ。
:returns pfpe: numpy.ndarray
計算された予測誤差電力のデータ。
:returns pc: numpy.ndarray
計算された自己相関関数(ARモデルの係数)。
"""
lag = mmx + 1
if lag+1 > lmax:
lmax = lag + 1
nmax = len(x)
sum = 0.0;
for i in range(nmax):
sum += y[i] # yの平均を計算
av = sum / nmax
y_adjusted = [val - av for val in y] # 平均除去したy
dt = x[1] - x[0]
pf, pfpe, pspec, pc, mmx , lmax, pm = MEM(dt, y_adjusted, mmx , lmax)
lag = mmx + 1
lg1 = lag + 1
lx1 = lmax + 1
print("\nf\tspc\tfpe\tauto c.")
# インデックス1から表示するため、0はスキップ
for i in range(1, min(lag, len(pf), len(pspec), len(pfpe), len(pc))):
print("{:.4f}\t{:.4f}\t{:.4f}\t{:.4f}".format(pf[i] , pspec[i] , pfpe[i] , pc[i]))
if lag != lmax:
for i in range(lg1, min(lmax+1, len(pf), len(pspec), len(pc))):
print("{:.4f}\t{:.4f}\t{:.4f}".format(pf[i], pspec[i], pc[i]))
for i in range(lx1, min(nmax, len(pf), len(pspec))):
print("{:.4f}\t{:.4f}".format(pf[i], pspec[i]))
return pf, pspec, pfpe, pc
[ドキュメント]
def Func(x):
"""
複数の正弦波を組み合わせた模擬時系列データを生成する。
詳細説明:
3つの異なる周波数、位相、振幅を持つ正弦波の和に、
わずかなランダムノイズ(0.03 * random.random())を加えた値を返します。
この関数はMEM分析のテストデータとして使用されることを目的としています。
Parameters
----------
:param x: float
時間軸上の現在の位置。
Returns
-------
:returns: float
計算された関数値にランダムノイズを加えたもの。
"""
x += 0.03 * random.random()
f1, p1, A1 = 1.5, pi/4.0, 1.0
f2, p2, A2 = 3.0, pi/3.0, 0.3
f3, p3, A3 = 10.0, pi/6.0, 0.5
return A1 * sin(2.0*pi * f1 * x + p1) \
+ A2 * sin(2.0*pi * f2 * x + p2) \
+ A3 * sin(2.0*pi * f3 * x + p3)
[ドキュメント]
def main():
"""
プログラムのエントリーポイント。MEM分析を実行し結果をプロットする。
詳細説明:
この関数は、以下の主要なステップを実行します。
1. グローバル変数 `x0`, `x1`, `nData` に基づいて時間軸 `x` と模擬時系列データ `y` を生成します。
2. 生成されたデータに対して `MEMCall` 関数を呼び出し、MEM分析を実行します。
3. 分析結果である入力時系列とMEMスペクトルを `matplotlib` を使用してグラフ化し表示します。
4. ユーザーがEnterキーを押すまでグラフを表示し続けます。
Parameters
----------
:param None
Returns
-------
:returns: None
"""
xstep = (x1 - x0) / nData
print("x = ({}, {}, {})".format(x0, x1, xstep))
print("nData=", nData)
print("W(Gauss)=", WG)
x = []
y = []
for i in range(nData):
xx = x0 + i * xstep
x.append(xx)
y.append(Func(xx))
pf, pspec, pfpe, pc = MEMCall(x, y, mmx, lmax)
#=============================
# Plot graphs
#=============================
fig = plt.figure()
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2)
ax1.plot(x, y, label = 'input')
ax1.set_xlabel("t")
ax1.set_ylabel("y")
ax1.legend()
# pfとpspecのインデックス0にはダミー値が入っている場合があるので、1から描画することが適切
# ただし、元のコードが全てをプロットする意図ならそのまま
# MEM関数やMEMCall関数の処理を考慮すると、pf, pspecはインデックス1から有効なデータが入ると考えられる。
# しかし、Docstringのルールは既存ロジックを変更しないことなので、そのままプロットする。
ax2.plot(pf, pspec, label = 'mem')
ax2.set_xlabel("f")
ax2.set_ylabel("mem")
ax2.legend()
plt.tight_layout()
plt.pause(0.1)
print("Press ENTER to exit>>", end = '')
input()
if __name__ == '__main__':
main()