import sys
import numpy as np
from scipy.stats import norm
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.animation import PillowWriter
#from matplotlib.animation import FuncAnimation, FFMpegWriter
"""
ガウス過程回帰を用いたベイズ最適化のアニメーションを生成するスクリプトです。
詳細説明:
指定された選択戦略(ランダム、不確実性最大、予測値最大、UCB、EI)に基づき、
新しい観測点を逐次的に追加し、ガウス過程モデルの予測をアニメーションで可視化します。
:doc:`gp_simulation_animation_usage`
"""
# mode = "random" / "std" / "max" / "ucb" / "ei"
#mode = "random"
#mode = "std"
#mode = "max"
mode = "ucb"
outfile = ""
#outfile = "gp_std_mode.gif"
nmaxiter = 20
if __name__ == "__main__":
argv = sys.argv
nargs = len(argv)
if nargs >= 2: mode = argv[1]
if nargs >= 3: nmaxiter = int(argv[2])
if nargs >= 4: outfile = argv[3]
plt.rcParams['font.family'] = 'MS Gothic'
plt.rcParams['axes.unicode_minus'] = False
# ==== 真の関数 ====
[ドキュメント]
def f_true(x):
"""
目的となる真の関数 f(x) を計算します。
詳細説明:
ガウス過程回帰のシミュレーションにおいて、未知の真の関数として使用されます。
:param x: `float` または `numpy.ndarray` - 入力値。
:returns: `float` または `numpy.ndarray` - f(x) の出力値。
"""
return np.sin(2 * np.pi * x) + 0.3 * np.cos(4 * np.pi * x)
# ==== RBFカーネル ====
[ドキュメント]
def rbf_kernel(x1, x2, length_scale=0.2, sigma_f=1.0):
"""
RBF (Radial Basis Function) カーネル行列を計算します。
詳細説明:
2つの入力データ間の類似度をガウス関数に基づいて評価します。
ガウス過程モデルの共分散関数として用いられます。
:param x1: `numpy.ndarray` - 1つ目の入力データ点群。
:param x2: `numpy.ndarray` - 2つ目の入力データ点群。
:param length_scale: `float` - カーネルの長さスケールパラメータ。デフォルトは0.2。
:param sigma_f: `float` - カーネルの振幅パラメータ。デフォルトは1.0。
:returns: `numpy.ndarray` - 計算されたRBFカーネル行列。
"""
x1 = np.atleast_2d(x1).T
x2 = np.atleast_2d(x2).T
d2 = (x1 - x2.T) ** 2
return sigma_f**2 * np.exp(-0.5 * d2 / length_scale**2)
# ==== ガウス過程回帰 ====
[ドキュメント]
def gp_posterior(x_train, y_train, x_test, noise_sigma,
length_scale=0.2, sigma_f=1.0):
"""
ガウス過程回帰の事後分布の平均と分散を計算します。
詳細説明:
訓練データ (x_train, y_train) とノイズ分散 noise_sigma を用いて、
新しいテストデータ x_test における事後予測の平均と分散を計算します。
コレスキー分解 (Cholesky decomposition) を使用して数値安定性を確保します。
:param x_train: `numpy.ndarray` - 訓練データ点のX座標。
:param y_train: `numpy.ndarray` - 訓練データ点のY座標。
:param x_test: `numpy.ndarray` - 予測を行うテストデータ点のX座標。
:param noise_sigma: `float` - 観測ノイズの標準偏差。
:param length_scale: `float` - RBFカーネルの長さスケール。デフォルトは0.2。
:param sigma_f: `float` - RBFカーネルの振幅。デフォルトは1.0。
:returns: `tuple[numpy.ndarray, numpy.ndarray]` - 事後平均 mu と事後分散 var。
"""
K = rbf_kernel(x_train, x_train, length_scale, sigma_f)
K_s = rbf_kernel(x_train, x_test, length_scale, sigma_f)
K_ss = rbf_kernel(x_test, x_test, length_scale, sigma_f)
K_y = K + (noise_sigma**2) * np.eye(len(x_train))
L = np.linalg.cholesky(K_y)
alpha = np.linalg.solve(L.T, np.linalg.solve(L, y_train))
mu = K_s.T @ alpha
v = np.linalg.solve(L, K_s)
cov = K_ss - v.T @ v
var = np.clip(np.diag(cov), 0, np.inf)
return mu, var
# ==== 既存点に近すぎる場合のオフセット ====
[ドキュメント]
def avoid_duplicate(x_new, x_train, eps=1e-3):
"""
新しいデータ点が既存の訓練データ点に極端に近接するのを防ぎます。
詳細説明:
x_new が x_train 内のいずれかの点と `eps` よりも近い場合、
`x_new` をわずかにずらして重複を避けます。ずらす際には [-0.01, 0.01] の
一様乱数を加算し、[0.0, 1.0] の範囲にクリップします。
:param x_new: `float` - 新しく追加しようとしているX座標。
:param x_train: `numpy.ndarray` - 既存の訓練データ点のX座標の配列。
:param eps: `float` - 許容される最小距離。この値より近い場合はずらす。デフォルトは1e-3。
:returns: `float` - 調整された新しいX座標。
"""
if len(x_train) == 0:
return x_new
while np.min(np.abs(x_train - x_new)) < eps:
x_new += np.random.uniform(-0.01, 0.01)
x_new = np.clip(x_new, 0.0, 1.0)
return x_new
# ==== メイン ====
[ドキュメント]
def main(mode="random", nmaxiter=40, savefile="gp_animation.mp4"):
"""
ガウス過程回帰を用いたベイズ最適化のプロセスをアニメーションで可視化します。
詳細説明:
指定された選択モード (`random`, `std`, `max`, `ucb`, `ei`) に基づいて
逐次的に新しい観測点を追加し、ガウス過程モデルの予測平均と95%信頼区間を
更新しながらアニメーションとして表示または保存します。
:param mode: `str` - 新しい観測点の選択戦略。
`"random"`: ランダムに点をサンプリング。
`"std"`: 予測の標準偏差が最大の点をサンプリング。
`"max"`: 予測平均が最大の点をサンプリング。
`"ucb"`: 上側信頼限界 (Upper Confidence Bound) が最大の点をサンプリング。
`"ei"`: 期待改善量 (Expected Improvement) が最大の点をサンプリング。
デフォルトは`"random"`。
:param nmaxiter: `int` - アニメーションのフレーム数(観測点の追加回数)。デフォルトは40。
:param savefile: `str` - アニメーションの保存先ファイル名。Noneまたは空文字列の場合、保存しない。デフォルトは`"gp_animation.mp4"`。
:returns: `None`
"""
rng = np.random.default_rng(1)
x_data = []
y_data = []
noise_sigma = 0.15
x_plot = np.linspace(0, 1, 200)
y_true = f_true(x_plot)
fig, ax = plt.subplots(figsize=(8, 5))
ax.set_xlabel("x")
ax.set_ylabel("y")
ax.plot(x_plot, y_true, "k--", label="真の関数 f(x)")
scatter = ax.scatter([], [], color="black", label="観測データ")
mean_line, = ax.plot([], [], color="C0", label="GP 予測平均")
uncert_band = None
ax.legend(loc="upper right")
ax.set_xlim(0, 1)
ax.set_ylim(-2.0, 2.0)
# ---- アニメーション更新 ----
def update(frame):
"""
アニメーションの各フレームでガウス過程回帰モデルとプロットを更新します。
詳細説明:
指定されたモードに基づき次の観測点を選択し、その点を訓練データに追加して
ガウス過程の事後分布を再計算し、プロット上の散布図、予測平均、
不確実性バンドを更新します。
:param frame: `int` - 現在のアニメーションフレーム番号(0からnmaxiter-1)。
:returns: `tuple[matplotlib.artist.Artist, matplotlib.artist.Artist, matplotlib.collections.PolyCollection]` -
更新されたMatplotlib Artistオブジェクトのタプル。
具体的には、散布図 (scatter)、予測平均線 (mean_line)、不確実性バンド (uncert_band)。
"""
nonlocal uncert_band, x_data, y_data
# 最初の1点はランダム
if len(x_data) == 0:
x_new = rng.uniform(0, 1)
else:
x_train = np.array(x_data)
y_train = np.array(y_data)
mu, var = gp_posterior(x_train, y_train, x_plot, noise_sigma)
std = np.sqrt(var)
if mode == "random":
x_new = rng.uniform(0, 1)
elif mode == "std":
idx = np.argmax(std)
x_new = x_plot[idx]
elif mode == "max":
idx = np.argmax(mu)
x_new = x_plot[idx]
elif mode == "ucb":
kappa = 2.0 # 探索の強さ
ucb = mu + kappa * std
idx = np.argmax(ucb)
x_new = x_plot[idx]
elif mode == "ei":
y_best = np.max(y_train)
improvement = mu - y_best
Z = improvement / (std + 1e-9)
ei = improvement * norm.cdf(Z) + std * norm.pdf(Z)
ei[std < 1e-12] = 0.0
idx = np.argmax(ei)
x_new = x_plot[idx]
else:
raise ValueError("mode must be 'random', 'std', or 'max'")
# 既存点と近すぎる場合はオフセット
x_new = avoid_duplicate(x_new, np.array(x_data))
# 新しい観測値
y_new = f_true(x_new) + rng.normal(0.0, noise_sigma)
x_data.append(x_new)
y_data.append(y_new)
# ---- GP posterior 再計算 ----
x_train = np.array(x_data)
y_train = np.array(y_data)
mu, var = gp_posterior(x_train, y_train, x_plot, noise_sigma)
std = np.sqrt(var)
scatter.set_offsets(np.column_stack([x_train, y_train]))
mean_line.set_data(x_plot, mu)
if uncert_band is not None:
uncert_band.remove()
uncert_band = ax.fill_between(
x_plot,
mu - 2 * std,
mu + 2 * std,
color="C0",
alpha=0.2
)
ax.set_title(f"mode={mode} | データ数 n = {len(x_train)}")
return scatter, mean_line, uncert_band
# ---- アニメーション作成 ----
anim = FuncAnimation(
fig,
update,
frames=nmaxiter,
interval=500,
blit=False,
repeat=False
)
# ---- 動画保存 ----
if savefile is None or savefile == "":
pass
else:
# GIF で保存
writer = PillowWriter(fps=2)
anim.save(savefile, writer=writer)
# mp4に保存
# writer = FFMpegWriter(fps=2)
# anim.save(savefile, writer=writer)
print(f"Saved animation to {savefile}")
plt.tight_layout()
plt.show()
if __name__ == "__main__":
main(mode=mode, nmaxiter=nmaxiter, savefile=outfile)