"""
概要:
1次元ベイズ最適化のシミュレーションとアニメーションを生成します。
詳細説明:
tkboライブラリを使用して、様々なモデルと獲得関数によるベイズ最適化の進行を可視化します。
各ステップで次の評価点がどのように選択され、ガウス過程の予測平均と不確実性がどのように更新されるかを
アニメーションで示します。
主な機能:
獲得関数「posterior mean maximization (平均値最大化)」のカスタム実装。
真の目的関数 f_true の定義。
コマンドラインインターフェースの構築と引数の正規化。
tkboオプティマイザの作成と設定。
ベイズ最適化のステップごとの実行と結果のプロット。
アニメーションの保存(GIF形式)と表示。
"""
from __future__ import annotations
import argparse
import traceback
import warnings
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter
from sklearn.exceptions import ConvergenceWarning
from tkbo import create_optimizer
from tkbo.base import BaseAcquisition
from tkbo.registry import register_acquisition
"""
Examples:
python gp_simulation_animation_tkbo.py --mode ucb --model sklearn_gpr --nmaxiter 120 --save 0 --show 1
python gp_simulation_animation_tkbo.py --mode ei --model sklearn_gpr --nmaxiter 120 --save 0 --show 1
python gp_simulation_animation_tkbo.py --mode stein --model physbo_gp --nmaxiter 120 --save 0 --show 1
python gp_simulation_animation_tkbo.py --model physbo --acquisition EI --nmaxiter 120 --save 0 --show 1
python gp_simulation_animation_tkbo.py --model random --nmaxiter 120 --save 0 --show 1
python gp_simulation_animation_tkbo.py --model random --nmaxiter 20 --outfile random.gif
"""
warnings.filterwarnings("ignore", category=ConvergenceWarning)
# ============================================================
# Custom acquisition: posterior mean maximization
# ============================================================
[ドキュメント]
class MeanAcquisition(BaseAcquisition):
"""
概要:
後方分布の平均値を最大化する獲得関数。
詳細説明:
tkboのBaseAcquisitionを継承し、予測された平均値が最も高い候補点を選択します。
maximizeがTrueの場合は平均値をそのまま返し、Falseの場合は平均値の負の値を返します。
属性:
:param name: 獲得関数の名前。
:type name: str
"""
name = "mean"
def __call__(
self,
X,
mean,
std,
y_observed=None,
observed_mask=None,
maximize=True,
**kwargs,
):
"""
概要:
獲得関数スコアを計算します。
詳細説明:
候補点の予測平均値を獲得スコアとして返します。
引数:
:param X: 候補点の集合。
:type X: numpy.ndarray
:param mean: 候補点における予測平均。
:type mean: numpy.ndarray
:param std: 候補点における予測標準偏差。
:type std: numpy.ndarray
:param y_observed: 観測されたy値の集合。
:type y_observed: numpy.ndarray, オプション
:param observed_mask: 観測済みデータを示すブールマスク。
:type observed_mask: numpy.ndarray, オプション
:param maximize: スコアを最大化するかどうかを示すフラグ。
:type maximize: bool
:param kwargs: その他のキーワード引数。
:type kwargs: dict
戻り値:
:returns: 各候補点の獲得スコア。
:rtype: numpy.ndarray
"""
mean = np.asarray(mean, dtype=float).reshape(-1)
return mean if maximize else -mean
register_acquisition("mean", MeanAcquisition)
register_acquisition("max", MeanAcquisition)
# ============================================================
# True objective / simulator
# ============================================================
[ドキュメント]
def f_true(x: np.ndarray | float) -> np.ndarray | float:
"""
概要:
真の目的関数を定義します。
詳細説明:
1次元入力に対して、sin関数とcos関数の組み合わせで定義される値を返します。
これはベイズ最適化の対象となる未知の関数をシミュレートします。
引数:
:param x: 評価点。
:type x: numpy.ndarray または float
戻り値:
:returns: xにおける関数の値。
:rtype: numpy.ndarray または float
"""
return np.sin(2 * np.pi * x) + 0.3 * np.cos(4 * np.pi * x)
# ============================================================
# CLI
# ============================================================
[ドキュメント]
def build_parser() -> argparse.ArgumentParser:
"""
概要:
コマンドライン引数パーサーを構築します。
詳細説明:
argparseモジュールを使用して、スクリプトの実行に必要な様々な設定(モード、モデル、
反復回数、出力ファイルなど)をコマンドラインから受け取るためのパーサーを作成します。
引数: なし。
戻り値:
:returns: 設定されたArgumentParserインスタンス。
:rtype: argparse.ArgumentParser
"""
p = argparse.ArgumentParser(
description="1D GP Bayesian optimization animation using tkbo."
)
# Backward-compatible positional arguments:
# python gp_simulation_animation_tkbo.py ucb 20 out.gif
p.add_argument("mode_pos", nargs="?", default=None)
p.add_argument("nmaxiter_pos", nargs="?", type=int, default=None)
p.add_argument("outfile_pos", nargs="?", default=None)
p.add_argument(
"--mode",
default="ucb",
help="random/std/max/ucb/ei/pi/lcb/stein/entropy/grid",
)
p.add_argument(
"--model",
default="sklearn_gpr",
help="sklearn_gpr, physbo_gp, physbo, random, grid, custom",
)
p.add_argument(
"--surrogate",
default="sklearn_gpr",
help="Used when --model custom",
)
p.add_argument(
"--acquisition",
default="",
help="Override acquisition. Empty means inferred from --mode.",
)
p.add_argument("--nmaxiter", type=int, default=20)
p.add_argument("--outfile", default="")
p.add_argument("--n-grid", type=int, default=200)
p.add_argument("--noise-sigma", type=float, default=0.15)
p.add_argument("--random-seed", type=int, default=1)
p.add_argument("--kappa", type=float, default=2.0)
p.add_argument("--length-scale", type=float, default=0.2)
p.add_argument("--n-restarts-optimizer", type=int, default=5)
p.add_argument("--num-rand-basis", type=int, default=200)
p.add_argument("--interval", type=int, default=0)
p.add_argument("--save", type=int, default=1, choices=[0, 1])
p.add_argument("--show", type=int, default=1, choices=[0, 1])
p.add_argument("--maximize", type=int, default=1, choices=[0, 1])
p.add_argument(
"--show-score",
type=int,
default=1,
choices=[0, 1],
help="Show acquisition score plot in lower panel.",
)
p.add_argument(
"--score-color",
type=int,
default=1,
choices=[0, 1],
help="Color candidate points by normalized score on the upper panel.",
)
p.add_argument(
"--fps",
type=int,
default=2,
help="FPS when saving GIF.",
)
return p
[ドキュメント]
def normalize_args(args: argparse.Namespace) -> argparse.Namespace:
"""
概要:
コマンドライン引数を正規化します。
詳細説明:
位置引数として与えられた値を対応するオプション引数にマッピングし、
文字列引数を小文字に変換するなどして、内部処理に適した形式に整えます。
引数:
:param args: コマンドライン引数を格納したNamespaceオブジェクト。
:type args: argparse.Namespace
戻り値:
:returns: 正規化されたargparse.Namespaceオブジェクト。
:rtype: argparse.Namespace
"""
if args.mode_pos is not None:
args.mode = args.mode_pos
if args.nmaxiter_pos is not None:
args.nmaxiter = args.nmaxiter_pos
if args.outfile_pos is not None:
args.outfile = args.outfile_pos
args.mode = args.mode.lower()
args.model = args.model.lower()
args.surrogate = args.surrogate.lower()
args.acquisition = args.acquisition.lower() if args.acquisition else ""
return args
# ============================================================
# tkbo mapping
# ============================================================
[ドキュメント]
def infer_acquisition_from_mode(mode: str) -> str:
"""
概要:
モード名から対応する獲得関数名を推論します。
詳細説明:
標準的なモード名(例: std, max, ucb, eiなど)に基づいて、
tkboライブラリで使用される実際の獲得関数名を決定します。
引数:
:param mode: モード名を表す文字列。
:type mode: str
戻り値:
:returns: 対応する獲得関数名を表す文字列。
:rtype: str
例外:
:raises ValueError: 指定されたmodeが認識できない場合。
"""
mode = mode.lower()
if mode == "std":
return "entropy"
if mode == "max":
return "mean"
if mode in {"ucb", "ei", "pi", "lcb", "stein", "entropy"}:
return mode
if mode in {"random", "grid"}:
return "random"
raise ValueError(
"mode must be one of: random, std, max, ucb, ei, pi, lcb, stein, entropy, grid"
)
[ドキュメント]
def effective_model(args: argparse.Namespace) -> str:
"""
概要:
シミュレーションのモデルを決定します。
詳細説明:
モードがrandomまたはgridの場合、モデルもそれに合わせて設定されます。
それ以外の場合は、引数で指定されたモデルがそのまま使用されます。
引数:
:param args: コマンドライン引数を格納したNamespaceオブジェクト。
:type args: argparse.Namespace
戻り値:
:returns: シミュレーションに実際に使用されるモデル名。
:rtype: str
"""
if args.mode in {"random", "grid"}:
return args.mode
return args.model
[ドキュメント]
def acquisition_kwargs(acquisition: str, args: argparse.Namespace) -> dict:
"""
概要:
指定された獲得関数に必要な引数を返します。
詳細説明:
各獲得関数(UCB、LCBなど)が初期化時に受け入れる固有のパラメータを、
引数argsから抽出して辞書形式で返します。
引数:
:param acquisition: 獲得関数名を表す文字列。
:type acquisition: str
:param args: コマンドライン引数を格納したNamespaceオブジェクト。
:type args: argparse.Namespace
戻り値:
:returns: 獲得関数固有のパラメータを含む辞書。
:rtype: dict
"""
acq = acquisition.lower()
if acq in {"ucb", "lcb"}:
return {"acquisition__kappa": args.kappa}
return {}
[ドキュメント]
def make_optimizer(
X_candidates: np.ndarray,
y_all: np.ndarray,
args: argparse.Namespace,
*,
for_plot: bool = False,
):
"""
概要:
tkboオプティマイザインスタンスを作成します。
詳細説明:
コマンドライン引数と現在の観測データに基づいて、指定されたモデルと獲得関数を持つ
tkboのoptimizerを作成し、初期化します。
プロット用のオプティマイザが必要な場合は、特定のモデルが上書きされることがあります。
引数:
:param X_candidates: 候補点のNumpy配列。
:type X_candidates: numpy.ndarray
:param y_all: すべての候補点に対応するy値のNumpy配列(未観測点はnan)。
:type y_all: numpy.ndarray
:param args: コマンドライン引数を格納したNamespaceオブジェクト。
:type args: argparse.Namespace
:param for_plot: プロット用としてオプティマイザを作成するかどうかを示すブール値。
:type for_plot: bool
戻り値:
:returns: 初期化されたtkboオプティマイザインスタンス。
:rtype: tkbo.optimizer.Optimizer
例外:
:raises ValueError: 指定されたモデルが認識できない場合。
"""
observed_mask = ~np.isnan(y_all)
maximize = bool(args.maximize)
acquisition = args.acquisition or infer_acquisition_from_mode(args.mode)
model = effective_model(args)
if for_plot and model in {"random", "grid", "physbo"}:
model = "sklearn_gpr"
acquisition = "ucb"
acq_params = acquisition_kwargs(acquisition, args)
if model == "random":
opt = create_optimizer(
model="random",
random_seed=args.random_seed,
)
elif model == "grid":
opt = create_optimizer(model="grid")
elif model == "physbo":
score_mode = acquisition.upper()
if score_mode == "MEAN":
raise ValueError(
"--model physbo does not support acquisition=mean. "
"Use --model physbo_gp or sklearn_gpr."
)
if score_mode in {"UCB", "LCB", "STEIN", "ENTROPY", "RANDOM"}:
raise ValueError(
"--model physbo supports mainly EI/PI/TS. "
"Use --model physbo_gp for custom acquisitions."
)
opt = create_optimizer(
model="physbo",
score_mode=score_mode,
num_rand_basis=args.num_rand_basis,
interval=args.interval,
random_seed=args.random_seed,
maximize=maximize,
)
elif model == "physbo_gp":
opt = create_optimizer(
model="physbo_gp",
acquisition=acquisition,
surrogate__num_rand_basis=args.num_rand_basis,
surrogate__interval=args.interval,
surrogate__score_mode="EI",
surrogate__random_seed=args.random_seed,
**acq_params,
maximize=maximize,
)
elif model == "sklearn_gpr":
opt = create_optimizer(
model="sklearn_gpr",
acquisition=acquisition,
surrogate__alpha=args.noise_sigma**2,
surrogate__normalize_y=True,
surrogate__n_restarts_optimizer=args.n_restarts_optimizer,
surrogate__random_state=args.random_seed,
surrogate__length_scale=args.length_scale,
**acq_params,
maximize=maximize,
)
elif model == "custom":
opt = create_optimizer(
model="custom",
surrogate=args.surrogate,
acquisition=acquisition,
random_seed=args.random_seed,
**acq_params,
maximize=maximize,
)
else:
raise ValueError(
"--model must be sklearn_gpr, physbo_gp, physbo, random, grid, or custom"
)
opt.initialize(X_candidates, y=y_all, observed_mask=observed_mask)
return opt
[ドキュメント]
def choose_next_index(
X_candidates: np.ndarray,
y_all: np.ndarray,
rng: np.random.Generator,
args: argparse.Namespace,
) -> int:
"""
概要:
次に評価すべき候補点のインデックスを選択します。
詳細説明:
既に観測されたデータと未観測の候補点に基づいて、tkboオプティマイザまたは
乱数ジェネレータを使用して、次の観測点として最適なインデックスを選択します。
引数:
:param X_candidates: 候補点のNumpy配列。
:type X_candidates: numpy.ndarray
:param y_all: すべての候補点に対応するy値のNumpy配列(未観測点はnan)。
:type y_all: numpy.ndarray
:param rng: 乱数ジェネレータインスタンス。
:type rng: numpy.random.Generator
:param args: コマンドライン引数を格納したNamespaceオブジェクト。
:type args: argparse.Namespace
戻り値:
:returns: 次に評価すべき候補点のインデックス。
:rtype: int
例外:
:raises RuntimeError: 未観測の候補点が残っていない場合やtkboが候補を返さない場合。
"""
observed_mask = ~np.isnan(y_all)
unobserved = np.where(~observed_mask)[0]
if len(unobserved) == 0:
raise RuntimeError("No unobserved candidates remain.")
# First point is random.
if observed_mask.sum() == 0:
return int(rng.choice(unobserved))
# Avoid recreating RandomOptimizer with the same seed every frame.
if effective_model(args) == "random":
return int(rng.choice(unobserved))
opt = make_optimizer(X_candidates, y_all, args, for_plot=False)
result = opt.ask(n_points=1)
if len(result.indices) == 0:
raise RuntimeError("tkbo returned no candidate.")
return int(result.indices[0])
[ドキュメント]
def normalize_score_for_color(score: np.ndarray) -> np.ndarray:
"""
概要:
獲得スコアを色付けのために正規化します。
詳細説明:
獲得スコアの配列を受け取り、最小値と最大値に基づいて0から1の範囲に正規化します。
これにより、プロットでの色付けに利用できます。NaN値はそのまま維持されます。
引数:
:param score: 獲得スコアのNumpy配列。
:type score: numpy.ndarray
戻り値:
:returns: 0から1に正規化されたスコアのNumpy配列。
:rtype: numpy.ndarray
"""
score = np.asarray(score, dtype=float).reshape(-1)
out = np.full_like(score, np.nan, dtype=float)
finite = np.isfinite(score)
if not np.any(finite):
return out
smin = np.nanmin(score[finite])
smax = np.nanmax(score[finite])
if smax <= smin:
out[finite] = 0.5
else:
out[finite] = (score[finite] - smin) / (smax - smin)
return out
[ドキュメント]
def safe_score_for_plot(score: np.ndarray, observed_mask: np.ndarray) -> np.ndarray:
"""
概要:
プロット用に獲得スコアを整形します。
詳細説明:
観測済みの点に対するスコアをNaNに設定し、未観測の点のみのスコアを返します。
これにより、プロット時に観測済みの点がスコアによって表示されないようにします。
引数:
:param score: 獲得スコアのNumpy配列。
:type score: numpy.ndarray
:param observed_mask: 観測済みデータを示すブールマスク配列。
:type observed_mask: numpy.ndarray
戻り値:
:returns: 観測済み点のスコアがNaNに設定されたスコアのNumpy配列。
:rtype: numpy.ndarray
"""
score = np.asarray(score, dtype=float).reshape(-1).copy()
score[observed_mask] = np.nan
finite = np.isfinite(score)
if not np.any(finite):
return score
return score
[ドキュメント]
def compute_plot_posterior_and_score(
X_candidates: np.ndarray,
y_all: np.ndarray,
args: argparse.Namespace,
):
"""
概要:
プロット用の後方分布の平均、標準偏差、獲得スコアを計算します。
詳細説明:
現在の観測データに基づいてtkboオプティマイザを作成し、すべての候補点に対する
ガウス過程の予測平均、標準偏差、および獲得スコアを計算します。
引数:
:param X_candidates: 候補点のNumpy配列。
:type X_candidates: numpy.ndarray
:param y_all: すべての候補点に対応するy値のNumpy配列(未観測点はnan)。
:type y_all: numpy.ndarray
:param args: コマンドライン引数を格納したNamespaceオブジェクト。
:type args: argparse.Namespace
戻り値:
:returns: 予測平均、標準偏差、獲得スコアのNumpy配列のタプル。
:rtype: tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]
"""
observed_mask = ~np.isnan(y_all)
plot_opt = make_optimizer(X_candidates, y_all, args, for_plot=True)
mu, std = plot_opt.predict(X_candidates, return_std=True)
mu = np.asarray(mu, dtype=float).reshape(-1)
std = np.asarray(std, dtype=float).reshape(-1)
score = None
try:
score = plot_opt.acquisition(X_candidates)
score = np.asarray(score, dtype=float).reshape(-1)
score = safe_score_for_plot(score, observed_mask)
except Exception:
score = np.full(len(X_candidates), np.nan, dtype=float)
return mu, std, score
# ============================================================
# Main animation
# ============================================================
[ドキュメント]
def run(args: argparse.Namespace) -> None:
"""
概要:
ベイズ最適化のシミュレーションを実行し、アニメーションを生成します。
詳細説明:
真の関数f_trueからのデータ収集、ガウス過程の更新、次の評価点の選択を
args.nmaxiterで指定された回数だけ繰り返します。
各ステップの状態を可視化し、アニメーションとして表示または保存します。
引数:
:param args: コマンドライン引数を格納したNamespaceオブジェクト。
:type args: argparse.Namespace
戻り値: なし。
"""
rng = np.random.default_rng(args.random_seed)
plt.rcParams["font.family"] = "MS Gothic"
plt.rcParams["axes.unicode_minus"] = False
x_plot = np.linspace(0.0, 1.0, args.n_grid)
X_candidates = x_plot.reshape(-1, 1)
y_true = f_true(x_plot)
y_all = np.full(args.n_grid, np.nan, dtype=float)
if args.show_score:
fig, (ax, ax_score) = plt.subplots(
2,
1,
figsize=(8, 7),
sharex=True,
gridspec_kw={"height_ratios": [3, 1]},
)
else:
fig, ax = plt.subplots(figsize=(8, 5))
ax_score = None
ax.set_ylabel("y")
ax.plot(x_plot, y_true, "k--", label="真の関数 f(x)")
candidate_color_scatter = None
if args.score_color:
candidate_color_scatter = ax.scatter(
[],
[],
c=[],
cmap="viridis",
s=16,
alpha=0.45,
vmin=0.0,
vmax=1.0,
label="候補点 score 色",
)
scatter = ax.scatter([], [], color="black", s=35, label="観測データ")
next_marker = ax.scatter([], [], color="red", s=90, marker="*", label="次の評価点")
mean_line, = ax.plot([], [], color="C0", label="GP 予測平均")
uncert_band = None
ax.legend(loc="upper right")
ax.set_xlim(0, 1)
ax.set_ylim(-2.0, 2.0)
if ax_score is not None:
ax_score.set_xlabel("x")
ax_score.set_ylabel("score")
score_line, = ax_score.plot([], [], color="C3", label="acquisition score")
score_next_marker = ax_score.scatter(
[],
[],
color="red",
s=70,
marker="*",
label="selected",
)
ax_score.grid(True, alpha=0.3)
ax_score.legend(loc="upper right")
else:
ax.set_xlabel("x")
score_line = None
score_next_marker = None
print("Configuration")
print(f" mode = {args.mode}")
print(f" model = {effective_model(args)}")
print(f" surrogate = {args.surrogate if args.model == 'custom' else '(model default)'}")
print(f" acquisition = {args.acquisition or infer_acquisition_from_mode(args.mode)}")
print(f" nmaxiter = {args.nmaxiter}")
print(f" show_score = {args.show_score}")
print(f" score_color = {args.score_color}")
def update(frame: int):
nonlocal uncert_band, y_all
idx_new = choose_next_index(X_candidates, y_all, rng, args)
x_new = x_plot[idx_new]
y_new = f_true(x_new) + rng.normal(0.0, args.noise_sigma)
y_all[idx_new] = y_new
observed_mask = ~np.isnan(y_all)
x_train = x_plot[observed_mask]
y_train = y_all[observed_mask]
mu, std, score = compute_plot_posterior_and_score(
X_candidates,
y_all,
args,
)
scatter.set_offsets(np.column_stack([x_train, y_train]))
next_marker.set_offsets(np.array([[x_new, y_new]], dtype=float))
mean_line.set_data(x_plot, mu)
if uncert_band is not None:
uncert_band.remove()
uncert_band = ax.fill_between(
x_plot,
mu - 2 * std,
mu + 2 * std,
color="C0",
alpha=0.2,
)
if candidate_color_scatter is not None:
color_score = normalize_score_for_color(score)
y_color = np.full_like(x_plot, ax.get_ylim()[0] + 0.08)
candidate_color_scatter.set_offsets(
np.column_stack([x_plot, y_color])
)
candidate_color_scatter.set_array(color_score)
if ax_score is not None:
score_line.set_data(x_plot, score)
if np.isfinite(score[idx_new]):
score_next_marker.set_offsets(
np.array([[x_new, score[idx_new]]], dtype=float)
)
else:
score_next_marker.set_offsets(np.empty((0, 2)))
finite = np.isfinite(score)
if np.any(finite):
smin = float(np.nanmin(score[finite]))
smax = float(np.nanmax(score[finite]))
if smax <= smin:
pad = 1.0 if smax == 0.0 else abs(smax) * 0.1
ax_score.set_ylim(smin - pad, smax + pad)
else:
pad = 0.08 * (smax - smin)
ax_score.set_ylim(smin - pad, smax + pad)
else:
ax_score.set_ylim(-1.0, 1.0)
ax.set_title(
f"mode={args.mode} | model={effective_model(args)} | "
f"n={observed_mask.sum()} | next index={idx_new}, x={x_new:.3f}"
)
print(
f"step={frame + 1:03d}, "
f"index={idx_new:03d}, "
f"x={x_new:.6f}, "
f"y={y_new:.6f}"
)
artists = [scatter, next_marker, mean_line, uncert_band]
if candidate_color_scatter is not None:
artists.append(candidate_color_scatter)
if score_line is not None:
artists.extend([score_line, score_next_marker])
return artists
anim = FuncAnimation(
fig,
update,
frames=args.nmaxiter,
interval=500,
blit=False,
repeat=False,
)
if args.save:
outfile = args.outfile
if not outfile:
outfile = (
f"gp_animation_{args.mode}_{effective_model(args)}_"
f"{args.nmaxiter}.gif"
)
outfile = Path(outfile)
writer = PillowWriter(fps=args.fps)
anim.save(outfile, writer=writer)
print(f"Saved animation to {outfile}")
plt.tight_layout()
if args.show:
plt.show()
else:
plt.close(fig)
[ドキュメント]
def main() -> None:
"""
概要:
スクリプトのエントリーポイントです。
詳細説明:
コマンドライン引数を解析し、run関数を呼び出してシミュレーションを開始します。
エラーが発生した場合はトレースバックを出力します。
引数: なし。
戻り値: なし。
"""
parser = build_parser()
args = normalize_args(parser.parse_args())
try:
run(args)
except Exception:
traceback.print_exc()
raise
if __name__ == "__main__":
main()