PHYSBO.gp_simulation_animation のソースコード

import sys
import numpy as np
from scipy.stats import norm

import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.animation import PillowWriter
#from matplotlib.animation import FuncAnimation, FFMpegWriter

"""
ガウス過程回帰を用いたベイズ最適化のアニメーションを生成するスクリプトです。

詳細説明:
    指定された選択戦略(ランダム、不確実性最大、予測値最大、UCB、EI)に基づき、
    新しい観測点を逐次的に追加し、ガウス過程モデルの予測をアニメーションで可視化します。

:doc:`gp_simulation_animation_usage`
"""

# mode = "random" / "std" / "max" / "ucb" / "ei"
#mode = "random"
#mode = "std"
#mode = "max"
mode = "ucb"

outfile = ""
#outfile = "gp_std_mode.gif"
nmaxiter = 20

if __name__ == "__main__":
    argv = sys.argv
    nargs = len(argv)
    if nargs >= 2: mode = argv[1]
    if nargs >= 3: nmaxiter = int(argv[2])
    if nargs >= 4: outfile = argv[3]


plt.rcParams['font.family'] = 'MS Gothic'
plt.rcParams['axes.unicode_minus'] = False

# ==== 真の関数 ====
[ドキュメント] def f_true(x): """ 目的となる真の関数 f(x) を計算します。 詳細説明: ガウス過程回帰のシミュレーションにおいて、未知の真の関数として使用されます。 :param x: `float` または `numpy.ndarray` - 入力値。 :returns: `float` または `numpy.ndarray` - f(x) の出力値。 """ return np.sin(2 * np.pi * x) + 0.3 * np.cos(4 * np.pi * x)
# ==== RBFカーネル ====
[ドキュメント] def rbf_kernel(x1, x2, length_scale=0.2, sigma_f=1.0): """ RBF (Radial Basis Function) カーネル行列を計算します。 詳細説明: 2つの入力データ間の類似度をガウス関数に基づいて評価します。 ガウス過程モデルの共分散関数として用いられます。 :param x1: `numpy.ndarray` - 1つ目の入力データ点群。 :param x2: `numpy.ndarray` - 2つ目の入力データ点群。 :param length_scale: `float` - カーネルの長さスケールパラメータ。デフォルトは0.2。 :param sigma_f: `float` - カーネルの振幅パラメータ。デフォルトは1.0。 :returns: `numpy.ndarray` - 計算されたRBFカーネル行列。 """ x1 = np.atleast_2d(x1).T x2 = np.atleast_2d(x2).T d2 = (x1 - x2.T) ** 2 return sigma_f**2 * np.exp(-0.5 * d2 / length_scale**2)
# ==== ガウス過程回帰 ====
[ドキュメント] def gp_posterior(x_train, y_train, x_test, noise_sigma, length_scale=0.2, sigma_f=1.0): """ ガウス過程回帰の事後分布の平均と分散を計算します。 詳細説明: 訓練データ (x_train, y_train) とノイズ分散 noise_sigma を用いて、 新しいテストデータ x_test における事後予測の平均と分散を計算します。 コレスキー分解 (Cholesky decomposition) を使用して数値安定性を確保します。 :param x_train: `numpy.ndarray` - 訓練データ点のX座標。 :param y_train: `numpy.ndarray` - 訓練データ点のY座標。 :param x_test: `numpy.ndarray` - 予測を行うテストデータ点のX座標。 :param noise_sigma: `float` - 観測ノイズの標準偏差。 :param length_scale: `float` - RBFカーネルの長さスケール。デフォルトは0.2。 :param sigma_f: `float` - RBFカーネルの振幅。デフォルトは1.0。 :returns: `tuple[numpy.ndarray, numpy.ndarray]` - 事後平均 mu と事後分散 var。 """ K = rbf_kernel(x_train, x_train, length_scale, sigma_f) K_s = rbf_kernel(x_train, x_test, length_scale, sigma_f) K_ss = rbf_kernel(x_test, x_test, length_scale, sigma_f) K_y = K + (noise_sigma**2) * np.eye(len(x_train)) L = np.linalg.cholesky(K_y) alpha = np.linalg.solve(L.T, np.linalg.solve(L, y_train)) mu = K_s.T @ alpha v = np.linalg.solve(L, K_s) cov = K_ss - v.T @ v var = np.clip(np.diag(cov), 0, np.inf) return mu, var
# ==== 既存点に近すぎる場合のオフセット ====
[ドキュメント] def avoid_duplicate(x_new, x_train, eps=1e-3): """ 新しいデータ点が既存の訓練データ点に極端に近接するのを防ぎます。 詳細説明: x_new が x_train 内のいずれかの点と `eps` よりも近い場合、 `x_new` をわずかにずらして重複を避けます。ずらす際には [-0.01, 0.01] の 一様乱数を加算し、[0.0, 1.0] の範囲にクリップします。 :param x_new: `float` - 新しく追加しようとしているX座標。 :param x_train: `numpy.ndarray` - 既存の訓練データ点のX座標の配列。 :param eps: `float` - 許容される最小距離。この値より近い場合はずらす。デフォルトは1e-3。 :returns: `float` - 調整された新しいX座標。 """ if len(x_train) == 0: return x_new while np.min(np.abs(x_train - x_new)) < eps: x_new += np.random.uniform(-0.01, 0.01) x_new = np.clip(x_new, 0.0, 1.0) return x_new
# ==== メイン ====
[ドキュメント] def main(mode="random", nmaxiter=40, savefile="gp_animation.mp4"): """ ガウス過程回帰を用いたベイズ最適化のプロセスをアニメーションで可視化します。 詳細説明: 指定された選択モード (`random`, `std`, `max`, `ucb`, `ei`) に基づいて 逐次的に新しい観測点を追加し、ガウス過程モデルの予測平均と95%信頼区間を 更新しながらアニメーションとして表示または保存します。 :param mode: `str` - 新しい観測点の選択戦略。 `"random"`: ランダムに点をサンプリング。 `"std"`: 予測の標準偏差が最大の点をサンプリング。 `"max"`: 予測平均が最大の点をサンプリング。 `"ucb"`: 上側信頼限界 (Upper Confidence Bound) が最大の点をサンプリング。 `"ei"`: 期待改善量 (Expected Improvement) が最大の点をサンプリング。 デフォルトは`"random"`。 :param nmaxiter: `int` - アニメーションのフレーム数(観測点の追加回数)。デフォルトは40。 :param savefile: `str` - アニメーションの保存先ファイル名。Noneまたは空文字列の場合、保存しない。デフォルトは`"gp_animation.mp4"`。 :returns: `None` """ rng = np.random.default_rng(1) x_data = [] y_data = [] noise_sigma = 0.15 x_plot = np.linspace(0, 1, 200) y_true = f_true(x_plot) fig, ax = plt.subplots(figsize=(8, 5)) ax.set_xlabel("x") ax.set_ylabel("y") ax.plot(x_plot, y_true, "k--", label="真の関数 f(x)") scatter = ax.scatter([], [], color="black", label="観測データ") mean_line, = ax.plot([], [], color="C0", label="GP 予測平均") uncert_band = None ax.legend(loc="upper right") ax.set_xlim(0, 1) ax.set_ylim(-2.0, 2.0) # ---- アニメーション更新 ---- def update(frame): """ アニメーションの各フレームでガウス過程回帰モデルとプロットを更新します。 詳細説明: 指定されたモードに基づき次の観測点を選択し、その点を訓練データに追加して ガウス過程の事後分布を再計算し、プロット上の散布図、予測平均、 不確実性バンドを更新します。 :param frame: `int` - 現在のアニメーションフレーム番号(0からnmaxiter-1)。 :returns: `tuple[matplotlib.artist.Artist, matplotlib.artist.Artist, matplotlib.collections.PolyCollection]` - 更新されたMatplotlib Artistオブジェクトのタプル。 具体的には、散布図 (scatter)、予測平均線 (mean_line)、不確実性バンド (uncert_band)。 """ nonlocal uncert_band, x_data, y_data # 最初の1点はランダム if len(x_data) == 0: x_new = rng.uniform(0, 1) else: x_train = np.array(x_data) y_train = np.array(y_data) mu, var = gp_posterior(x_train, y_train, x_plot, noise_sigma) std = np.sqrt(var) if mode == "random": x_new = rng.uniform(0, 1) elif mode == "std": idx = np.argmax(std) x_new = x_plot[idx] elif mode == "max": idx = np.argmax(mu) x_new = x_plot[idx] elif mode == "ucb": kappa = 2.0 # 探索の強さ ucb = mu + kappa * std idx = np.argmax(ucb) x_new = x_plot[idx] elif mode == "ei": y_best = np.max(y_train) improvement = mu - y_best Z = improvement / (std + 1e-9) ei = improvement * norm.cdf(Z) + std * norm.pdf(Z) ei[std < 1e-12] = 0.0 idx = np.argmax(ei) x_new = x_plot[idx] else: raise ValueError("mode must be 'random', 'std', or 'max'") # 既存点と近すぎる場合はオフセット x_new = avoid_duplicate(x_new, np.array(x_data)) # 新しい観測値 y_new = f_true(x_new) + rng.normal(0.0, noise_sigma) x_data.append(x_new) y_data.append(y_new) # ---- GP posterior 再計算 ---- x_train = np.array(x_data) y_train = np.array(y_data) mu, var = gp_posterior(x_train, y_train, x_plot, noise_sigma) std = np.sqrt(var) scatter.set_offsets(np.column_stack([x_train, y_train])) mean_line.set_data(x_plot, mu) if uncert_band is not None: uncert_band.remove() uncert_band = ax.fill_between( x_plot, mu - 2 * std, mu + 2 * std, color="C0", alpha=0.2 ) ax.set_title(f"mode={mode} | データ数 n = {len(x_train)}") return scatter, mean_line, uncert_band # ---- アニメーション作成 ---- anim = FuncAnimation( fig, update, frames=nmaxiter, interval=500, blit=False, repeat=False ) # ---- 動画保存 ---- if savefile is None or savefile == "": pass else: # GIF で保存 writer = PillowWriter(fps=2) anim.save(savefile, writer=writer) # mp4に保存 # writer = FFMpegWriter(fps=2) # anim.save(savefile, writer=writer) print(f"Saved animation to {savefile}") plt.tight_layout() plt.show()
if __name__ == "__main__": main(mode=mode, nmaxiter=nmaxiter, savefile=outfile)