"""
ガウス過程に基づくベイズ最適化を実行するCLIアプリケーション。
このスクリプトは、PHYSBOライブラリを利用してガウス過程ベースのベイズ最適化(BO)を行います。
与えられたデータファイル(ExcelまたはCSV)から記述子と目的関数を読み込み、
訓練データに基づいてモデルを構築し、未探索のデータポイントに対して最適な候補を探索します。
予測結果はExcelファイルに出力され、各種グラフ(等高線図、パレート図、予測値/獲得関数)も生成されます。
主な機能:
- データファイルからの記述子と目的関数の読み込み。
- 目的関数の最適化モード(最大化、最小化、目標値への収束)に応じたデータ変換。
- 記述子の標準化。
- PHYSBOを用いたベイズ探索(獲得関数としてEI, PI, TSをサポート)。
- 探索結果(最良候補、事後平均、事後標準偏差)の保存。
- 探索過程と予測結果の可視化(等高線図、パレート図、予測値/獲得関数のプロット)。
- グラフ上でのデータポイントクリックによる詳細情報の表示。
- コマンドライン引数によるパラメータ設定。
関連リンク:
:doc:`bayes_gp_plain_shimizu_usage`
"""
import sys
from sys import exit
import os
import time
import re
import numpy as np
from numpy import sqrt, log
import openpyxl
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
try:
import physbo
except:
print("")
print("#########################################################")
print("# IMPORT ERROR for physbo")
print("# Use python 3.6 environment and install physbo")
print("# For anaconda and if you made py36 as python 3.6 virtual environment:")
print("# > conda activate py36")
print("# > pip install physbo")
print("# or")
print("# > pip3 install physbo")
print("# or")
print("# > python -m pip3 install physbo")
print("#########################################################")
exit()
#===================================================================================
"""
Perform Bayes optimization based on Gauss process using PHYSBO
"""
#===================================================================================
#===================================================================================
usage_str = '''
f"(i) usage: python {sys.argv[0]} infile max_num_probes num_rand_basis score_mode interval standardize"
f" max_num_probes: Use a number to reach convergence"
f" num_rand_basis: Use a large number so as to reproduce training data"
f" score_mode : [EI|PI|TS]"
f" interval : Number of cycle to update hyper parameters"
f" standardize : Flag to standardize descrptors [0|1]"
f" ex: python {sys.argv[0]} {cparams.infile} {cparams.max_num_probes} {cparams.num_rand_basis} {cparams.score_mode} {cparams.interval}"
'''[1:-1]
#===================================================================================
#==============================================
# Global variables
#==============================================
physbo_url = "https://www.pasums.issp.u-tokyo.ac.jp/physbo/"
citation_url = "https://issp-center-dev.github.io/PHYSBO/manual/master/en/introduction.html"
prog_name = 'Bayes/GP CLI'
version = [1, 2, 1]
#===================================================================================
# Basic functions / classes
#===================================================================================
[ドキュメント]
def pint(s, strict = True, defval = 0):
"""
概要: 文字列を安全に整数に変換します。
詳細説明:
指定された文字列 `s` を整数に変換します。変換に失敗した場合、
`defval` で指定されたデフォルト値を返します。
:param s: 変換対象の文字列。
:type s: str
:param strict: 厳密な変換を適用するかどうか (現在未使用)。
:type strict: bool
:param defval: 変換失敗時に返すデフォルト値。
:type defval: int
:returns: 変換された整数、またはデフォルト値。
:rtype: int
"""
try:
return int(s)
except:
return defval
[ドキュメント]
def pfloat(s, strict = True, defval = 0.0):
"""
概要: 文字列を安全に浮動小数点数に変換します。
詳細説明:
指定された文字列 `s` を浮動小数点数に変換します。変換に失敗した場合、
`defval` で指定されたデフォルト値を返します。
:param s: 変換対象の文字列。
:type s: str
:param strict: 厳密な変換を適用するかどうか (現在未使用)。
:type strict: bool
:param defval: 変換失敗時に返すデフォルト値。
:type defval: float
:returns: 変換された浮動小数点数、またはデフォルト値。
:rtype: float
"""
try:
return float(s)
except:
return defval
[ドキュメント]
def pintfloat(s, strict = True, defval = 0.0):
"""
概要: 文字列を安全に整数または浮動小数点数に変換します。
詳細説明:
指定された文字列 `s` を最初に整数に変換を試みます。
整数への変換に失敗した場合、次に浮動小数点数への変換を試みます。
どちらの変換も失敗した場合、`defval` で指定されたデフォルト値を返します。
:param s: 変換対象の文字列。
:type s: str
:param strict: 厳密な変換を適用するかどうか (現在未使用)。
:type strict: bool
:param defval: 変換失敗時に返すデフォルト値。
:type defval: float
:returns: 変換された整数または浮動小数点数、またはデフォルト値。
:rtype: int | float
"""
try:
return int(s)
except:
pass
try:
return float(s)
except:
return defval
[ドキュメント]
def getarg(position, defval = None):
"""
概要: コマンドライン引数を安全に取得します。
詳細説明:
`sys.argv` から指定された `position` のコマンドライン引数を取得します。
指定された位置に引数が存在しない場合、`defval` で指定されたデフォルト値を返します。
:param position: 取得する引数のインデックス (0はスクリプト名)。
:type position: int
:param defval: 引数が存在しない場合に返すデフォルト値。
:type defval: str | None
:returns: 指定された位置の引数、またはデフォルト値。
:rtype: str | None
"""
try:
return sys.argv[position]
except:
return defval
[ドキュメント]
def getfloatarg(position, defval = None):
"""
概要: コマンドライン引数を浮動小数点数として取得します。
詳細説明:
`getarg` を使用して指定された位置のコマンドライン引数を文字列として取得し、
`pfloat` を使用してそれを浮動小数点数に変換します。
:param position: 取得する引数のインデックス (0はスクリプト名)。
:type position: int
:param defval: 引数が存在しない場合や変換失敗時に返すデフォルト値。
:type defval: float | None
:returns: 変換された浮動小数点数、またはデフォルト値。
:rtype: float | None
"""
return pfloat(getarg(position, defval))
[ドキュメント]
def getintarg(position, defval = None):
"""
概要: コマンドライン引数を整数として取得します。
詳細説明:
`getarg` を使用して指定された位置のコマンドライン引数を文字列として取得し、
`pint` を使用してそれを整数に変換します。
:param position: 取得する引数のインデックス (0はスクリプト名)。
:type position: int
:param defval: 引数が存在しない場合や変換失敗時に返すデフォルト値。
:type defval: int | None
:returns: 変換された整数、またはデフォルト値。
:rtype: int | None
"""
return pint(getarg(position, defval))
[ドキュメント]
class tkObject:
"""
概要: 汎用的な属性アクセスと管理のための基底クラスです。
詳細説明:
このクラスは、オブジェクトの属性を辞書形式で取得、設定、更新、および表示するための
基本的なユーティリティメソッドを提供します。他のクラスの基底として機能することを
目的としています。
"""
[ドキュメント]
def get(self, key, defval = None):
"""
概要: 指定されたキーの属性値を取得します。
詳細説明:
オブジェクトの内部辞書 (`__dict__`) から `key` に対応する値を取得します。
`key` が存在しない場合、`defval` で指定されたデフォルト値を返します。
:param key: 取得する属性のキー。
:type key: str
:param defval: キーが存在しない場合に返すデフォルト値。
:type defval: any | None
:returns: 属性値、またはデフォルト値。
:rtype: any
"""
return self.__dict__.get(key, defval)
[ドキュメント]
def set_attribute(self, key, val):
"""
概要: 指定されたキーに属性値を設定します。
詳細説明:
オブジェクトの内部辞書 (`__dict__`) に `key` と `val` のペアを追加または更新します。
:param key: 設定する属性のキー。
:type key: str
:param val: 設定する属性の値。
:type val: any
:returns: None
:rtype: None
"""
self.__dict__[key] = val
[ドキュメント]
def update(self, **args):
"""
概要: 複数の属性を辞書形式で更新します。
詳細説明:
キーワード引数として渡された辞書を使って、オブジェクトの内部辞書 (`__dict__`) を更新します。
:param args: キーと値のペアを含むキーワード引数。
:type args: dict
:returns: None
:rtype: None
"""
self.__dict__.update(args)
[ドキュメント]
def printinf(self, app):
"""
概要: オブジェクトの全属性とその値をコンソールに表示します。
詳細説明:
オブジェクトのすべての属性(キーと値のペア)を整形して標準出力に表示します。
:param app: アプリケーションオブジェクト (出力に使用)。
:type app: tkApplication
:returns: None
:rtype: None
"""
print("Parameters:")
for key in self.__dict__.keys():
print(f" {key}: {self.__dict__[key]}")
[ドキュメント]
class tkParams(tkObject):
"""
概要: アプリケーションのパラメータを管理するためのクラスです。
詳細説明:
`tkObject` を継承し、コマンドライン引数を初期化時に保持する機能を追加しています。
アプリケーション全体で共有される設定やオプションを格納するために使用されます。
"""
def __init__(self, parameter_file = None, **args):
"""
概要: tkParamsオブジェクトを初期化します。
詳細説明:
`tkObject` の初期化に加え、`sys.argv` を内部変数 `_argv` に格納します。
また、キーワード引数として渡されたパラメータでオブジェクトの属性を更新します。
:param parameter_file: パラメータファイルへのパス (現在未使用)。
:type parameter_file: str | None
:param args: 初期化時に設定する追加のパラメータ。
:type args: dict
:returns: None
:rtype: None
"""
super(tkObject, self).__init__(**args)
self._argv = sys.argv
self.update(**args)
[ドキュメント]
def printinf(self, app):
"""
概要: パラメータオブジェクトの全属性とその値をコンソールに表示します。
詳細説明:
親クラス `tkObject` の `printinf` メソッドと同様に、
このパラメータオブジェクトのすべての属性を整形して標準出力に表示します。
:param app: アプリケーションオブジェクト (出力に使用)。
:type app: tkApplication
:returns: None
:rtype: None
"""
print("Parameters:")
for key in self.__dict__.keys():
print(f" {key}: {self.__dict__[key]}")
[ドキュメント]
class tkApplication(tkObject):
"""
概要: アプリケーション全体のコンテキストとユーティリティを提供するクラスです。
詳細説明:
このクラスは、アプリケーションのパラメータ、コマンドライン引数、スクリプトパス、
使用法文字列、グローバル/ローカル変数への参照を保持します。
また、`print` メソッドを通じて標準化された出力機能を提供します。
"""
def __init__(self, _globals = None, locals = None, **args):
"""
概要: tkApplicationオブジェクトを初期化します。
詳細説明:
親クラス `tkObject` を初期化し、`tkParams` のインスタンスを `params` 属性として設定します。
コマンドライン引数 (`sys.argv`)、スクリプトパス、および使用法文字列を初期化します。
また、キーワード引数で渡された追加の属性でオブジェクトを更新します。
:param _globals: グローバル名前空間への参照 (現在未使用)。
:type _globals: dict | None
:param locals: ローカル名前空間への参照 (現在未使用)。
:type locals: dict | None
:param args: 初期化時に設定する追加の属性。
:type args: dict
:returns: None
:rtype: None
"""
super().__init__()
self.params = tkParams()
self.argv = sys.argv
self.script_path = sys.argv[0]
self.usage_str = None
self.globals = globals
self.locals = locals
self.update(**args)
[ドキュメント]
def get_argv(self):
"""
概要: コマンドライン引数とその数を取得します。
詳細説明:
`sys.argv` からプログラムに渡されたすべてのコマンドライン引数のリストと、
その引数の総数をタプルとして返します。
:returns: コマンドライン引数のリストと引数の数。
:rtype: tuple[list[str], int]
"""
return sys.argv, len(sys.argv)
[ドキュメント]
def get_params(self):
"""
概要: アプリケーションのパラメータオブジェクトを取得します。
詳細説明:
`cparams` 属性がまだ設定されていない場合、新しく `tkParams` のインスタンスを作成し、
それを `cparams` 属性として設定してから返します。
:returns: アプリケーションのパラメータオブジェクト。
:rtype: tkParams
"""
if self.get('cparams', None) is None:
self.cparams = tkParams()
return self.cparams
[ドキュメント]
def print(self, *args, **kwargs):
"""
概要: メッセージをコンソールに出力します。
詳細説明:
Pythonの組み込み `print` 関数と同じインターフェースを提供します。
このメソッドを介して出力を行うことで、将来的にログ機能や出力制御を
一元的に追加することが可能になります。
:param args: 出力する位置引数。
:type args: tuple
:param kwargs: `print` 関数に渡すキーワード引数 (例: `end`, `sep`, `file`)。
:type kwargs: dict
:returns: None
:rtype: None
"""
print(*args, **kwargs)
#===================================================================================
# Other functions
#===================================================================================
[ドキュメント]
def citation(app):
"""
概要: PHYSBOの引用に関する情報を表示します。
詳細説明:
PHYSBOライブラリの使用を引用するべき参照先のURLをコンソールに出力します。
:param app: アプリケーションオブジェクト (出力に使用)。
:type app: tkApplication
:returns: None
:rtype: None
"""
app.print("")
app.print( "====================================================================================")
app.print("Please cite designated referneces for PHYSBO. See")
app.print(f" Citation: {citation_url}")
app.print( "====================================================================================")
[ドキュメント]
def usage(app):
"""
概要: プログラムの正しい使用法と引用情報を表示します。
詳細説明:
コマンドライン引数の使用方法を示す文字列 `usage_str` を整形して出力し、
その後 `citation` 関数を呼び出して引用情報を表示します。
:param app: アプリケーションオブジェクト (出力に使用)。
:type app: tkApplication
:returns: None
:rtype: None
"""
cparams = app.get_params()
for s in usage_str.split('\n'):
cmd = 'app.print({})'.format(s.rstrip())
eval(cmd)
citation(app)
[ドキュメント]
def terminate(app, message = None, usage = None, pause = False):
"""
概要: プログラムを終了します。
詳細説明:
オプションで終了メッセージを表示し、使用法を表示し、
ユーザーがEnterキーを押すまで一時停止した後、プログラムを終了します。
:param app: アプリケーションオブジェクト (出力に使用)。
:type app: tkApplication
:param message: 終了時に表示するメッセージ。
:type message: str | None
:param usage: `usage` 関数を呼び出すかどうか。`callable` が渡された場合、それを呼び出します。
:type usage: callable | None
:param pause: `True` の場合、終了前にユーザーがEnterキーを押すまで待機します。
:type pause: bool
:returns: None
:rtype: None
"""
if message is not None:
app.print("")
app.print(message)
if usage:
app.print("")
usage(app)
app.print("")
if pause:
input("Press ENTER to terminate >> ")
app.print("")
exit()
[ドキュメント]
def initialize():
"""
概要: アプリケーションとパラメータオブジェクトを初期化します。
詳細説明:
`tkApplication` と `tkParams` のインスタンスを作成し、
デフォルトのパラメータ値を設定します。これらは、プログラムの実行中に
コマンドライン引数によって更新される可能性があります。
:returns: 初期化されたアプリケーションオブジェクトとパラメータオブジェクトのタプル。
:rtype: tuple[tkApplication, tkParams]
"""
#================================
# Global variables
#================================
app = tkApplication(usage_str = usage_str)
argv, narg = app.get_argv()
cparams = app.get_params()
cparams.debug = 0
cparams.print_level = 0
cparams.infile = 'data_simple.xlsx'
# cparams.infile = 'data_simple.csv'
cparams.nx_2D = 11
cparams.ny_2D = 11
cparams.standardize = 1
cparams.num_search_each = 1
cparams.max_num_probes = 1
cparams.num_rand_basis = 200 # -1 for non-approximated
cparams.score_mode = 'EI'
cparams.interval = 0
return app, cparams
[ドキュメント]
def replace_path(path, template, ext_dict):
"""
概要: ファイルパスの要素をテンプレート文字列に置き換えます。
詳細説明:
指定された `path` から、フルパス、ディレクトリ名、ファイル名、ファイル本体、
拡張子を抽出し、それらを `ext_dict` に追加します。
その後、`template` 文字列内の対応するプレースホルダーをこれらの値で置き換えて、
新しいパス文字列を生成して返します。
:param path: 処理対象のファイルパス。
:type path: str
:param template: 置き換えに使用するテンプレート文字列 (例: '{filebody}-predict.xlsx')。
:type template: str
:param ext_dict: テンプレートに渡す追加の辞書。抽出されたパス要素がここに追加されます。
:type ext_dict: dict
:returns: 置き換えが適用された新しいパス文字列。
:rtype: str
"""
fullpath = os.path.abspath(path)
filename = os.path.basename(fullpath)
dirname = os.path.dirname(fullpath)
filebody, ext = os.path.splitext(filename)
ext_dict.update({
"fullpath": fullpath,
"dirname": dirname,
"filename": filename,
"filebody": filebody,
"ext": ext
})
return template.format(**ext_dict)
[ドキュメント]
def get_dirname(path):
"""
概要: 指定されたパスのディレクトリ名を取得します。
詳細説明:
`os.path.abspath` と `os.path.dirname` を使用して、
指定されたファイルパスの絶対ディレクトリ名を取得します。
:param path: ディレクトリ名を取得する対象のファイルパス。
:type path: str
:returns: ディレクトリ名。
:rtype: str
"""
fullpath = os.path.abspath(path)
dirname = os.path.dirname(fullpath)
return dirname
[ドキュメント]
def update_vars(app, cparams):
"""
概要: コマンドライン引数に基づいてアプリケーションのパラメータを更新します。
詳細説明:
`sys.argv` からコマンドライン引数を読み込み、`cparams` オブジェクトの
`infile`, `max_num_probes`, `num_rand_basis`, `score_mode`, `interval`,
`standardize` などのパラメータを更新します。
また、出力結果ファイルと予測結果ファイルのパスを生成し、`cparams` に設定します。
:param app: アプリケーションオブジェクト。
:type app: tkApplication
:param cparams: 更新するパラメータオブジェクト。
:type cparams: tkParams
:returns: None
:rtype: None
"""
argv, narg = app.get_argv()
# if narg <= 1:
# app.terminate(usage = usage)
cparams.infile = getarg (1, cparams.infile)
cparams.max_num_probes = getintarg(2, cparams.max_num_probes)
cparams.num_rand_basis = getintarg(3, cparams.num_rand_basis)
cparams.score_mode = getarg (4, cparams.score_mode)
cparams.interval = getintarg(5, cparams.interval)
cparams.standardize = getintarg(6, cparams.standardize)
dirname = get_dirname(cparams.infile)
outresfile = replace_path(cparams.infile, 'search_result{i}.npz', {'i': '{i}'})
cparams.outresfile = os.path.join(dirname, outresfile)
outfile = replace_path(cparams.infile, '{filebody}-predict{i}.xlsx', {'i': '{i}'})
cparams.outfile = os.path.join(dirname, outfile)
#====================================================
# Graph parameters
#====================================================
# cparams.figsize = (4, 4)
cparams.figsize = (8, 6)
cparams.fontsize = 12
cparams.legend_fontsize = 8
[ドキュメント]
def read_data_file(app, infile):
"""
概要: 指定されたファイルからデータをPandas DataFrameとして読み込みます。
詳細説明:
ファイルパス `infile` が `.xlsx` 拡張子を持つ場合、`openpyxl` エンジンを使用してExcelファイルを読み込みます。
それ以外の場合はCSVファイルとして読み込みます。
ファイルの読み込みに失敗した場合、エラーメッセージをコンソールに出力し `None` を返します。
:param app: アプリケーションオブジェクト (エラー出力に使用)。
:type app: tkApplication
:param infile: 読み込むデータファイルへのパス。
:type infile: str
:returns: 読み込まれたデータを格納するPandas DataFrame、またはエラーが発生した場合は `None`。
:rtype: pd.DataFrame | None
"""
error = False
if '.xlsx' in infile:
try:
df = pd.read_excel(infile, engine = 'openpyxl')
except:
error = True
else:
try:
df = pd.read_csv(infile)
except:
error = True
if error:
app.print(f"Error in load_data(): Can not read [{infile}]")
return None
# terminate(app, f"Error in load_data(): Can not read [{infile}]", usage = usage)
return df
[ドキュメント]
def split_raw_data(df_original):
"""
概要: データフレームの列ヘッダーを解析し、目的変数と記述子に分割します。
詳細説明:
`df_original` の列名を走査し、ヘッダーに埋め込まれた制御コード(例: `=value:`, `max:`, `min:`)
に基づいて目的変数を識別します。制御コードは目的変数の最適化モード(特定の数値への収束、最大化、最小化)
とパレート図のインデックスを示します。
制御コードのない列は記述子として扱われます。
結果は `tkParams` オブジェクトとして、目的変数と記述子の情報(ラベル、モード、値など)に分割されて返されます。
:param df_original: 解析する元のデータフレーム。
:type df_original: pd.DataFrame
:returns: 目的変数に関する情報 (`targets`) と記述子に関する情報 (`descriptors`) を含む `tkParams` オブジェクトのタプル。
:rtype: tuple[tkParams, tkParams]
"""
targets = tkParams()
descriptors = tkParams()
targets.labels0 = []
targets.labels = []
targets.modes = []
targets.indexes = []
targets.values = []
descriptors.labels0 = []
columns = df_original.columns.to_list()
# ヘッダーの制御コードにより、記述子と目的関数、変換モードを抽出
idx_target_x = None
idx_target_y = None
for s in columns:
# 数値最適化
if re.match(r'=([+-\.\deE]+):', s, flags = re.IGNORECASE):
m = re.match(r'=([+-\.\deE]+):(.*)', s, flags = re.IGNORECASE)
targets.labels0.append(s)
targets.labels.append(m.groups()[1])
targets.indexes.append(None)
targets.modes.append('value')
targets.values.append(pfloat(m.groups()[0]))
# 最大化
elif re.match(r'max\d*:', s, flags = re.IGNORECASE):
m = re.match(r'max(\d*):(.*)', s, flags = re.IGNORECASE)
targets.labels0.append(s)
targets.labels.append(m.groups()[1])
index = pint(m.groups()[0], defval = None)
if index is None or index == '':
if idx_target_x is None:
index = 0
idx_target_x = index
elif idx_target_y is None:
index = 1
idx_target_y = index
else:
index = 999999
else:
index -= 1
targets.indexes.append(index)
targets.modes.append('max')
targets.values.append(None)
elif re.match(r'[to]\d*:', s, flags = re.IGNORECASE):
m = re.match(r'[to](\d*):(.*)', s, flags = re.IGNORECASE)
targets.labels0.append(s)
targets.labels.append(m.groups()[1])
index = pint(m.groups()[0], defval = None)
if index is None or index == '':
if idx_target_x is None:
index = 0
idx_target_x = index
elif idx_target_y is None:
index = 1
idx_target_y = index
else:
index = 999999
else:
index -= 1
targets.indexes.append(index)
targets.modes.append('max')
targets.values.append(None)
# 最小化
elif re.match(r'min\d*:', s, flags = re.IGNORECASE):
m = re.match(r'min(\d*):(.*)', s, flags = re.IGNORECASE)
targets.labels0.append(s)
targets.labels.append(m.groups()[1])
index = pint(m.groups()[0], defval = None)
if index is None or index == '':
if idx_target_x is None:
index = 0
idx_target_x = index
elif idx_target_y is None:
index = 1
idx_target_y = index
else:
index = 999999
else:
index -= 1
targets.indexes.append(index)
targets.modes.append('min')
targets.values.append(None)
# 記述子、目的関数から除外
elif s is None or s == '' or re.match(r'\-', s):
pass
else:
# 記述子に追加
descriptors.labels0.append(s)
ntargets = len(targets.labels0)
if ntargets == 0:
targets.labels0.append(columns[0])
targets.labels.append(columns[0])
targets.indexes.append(0)
targets.modes.append('max')
targets.values.append(None)
ntargets = 1
# 記述子に目的関数と同じ列が入っている場合、削除
# print( " descriptors:", descriptors.labels0)
try:
for i in range(ntargets):
descriptors.labels0.remove(targets.labels0[i])
except:
pass
return targets, descriptors
[ドキュメント]
def get_plot_descriptors(descriptors):
"""
概要: プロットに使用する記述子のインデックスとラベルを決定します。
詳細説明:
与えられた記述子のリスト `descriptors` から、プロットのX、Y、Z軸に
割り当てる記述子のインデックスとラベルを抽出します。
記述子名に `X:`, `Y:`, `Z:` の形式の制御子が含まれている場合、
それらを解析してプロット軸を決定し、制御子を除いたクリーンなラベルを生成します。
記述子の数が少ない場合はデフォルトの割り当てが行われます。
:param descriptors: 制御子を含む記述子のリスト。
:type descriptors: list[str]
:returns: プロット軸のインデックス、プロット軸のラベル、および制御子を除いた記述子ラベルのリストのタプル。
:rtype: tuple[dict[str, int], dict[str, str], list[str]]
"""
if len(descriptors) == 1:
plot_indexes = {'x': 0}
plot_labels = {'x': descriptors[0]}
elif len(descriptors) == 2:
plot_indexes = {'x': 0, 'y': 1}
plot_labels = {'x': descriptors[0], 'y': descriptors[1]}
else:
plot_indexes = {'x': 0, 'y': 1, 'z': 2}
plot_labels = {'x': descriptors[0], 'y': descriptors[1], 'z': descriptors[2]}
descriptors_label = descriptors # DO NOT MODIFY (Rule 1: existing logic)
for i in range(len(descriptors)):
m = re.search(r'([xyzXYZ]):\s*(.*)\s*$', descriptors[i])
if m:
descriptors_label[i] = m.groups()[1]
var = m.groups()[0].lower()
plot_indexes[var] = i
plot_labels[var] = descriptors_label[i]
return plot_indexes, plot_labels, descriptors_label
[ドキュメント]
def load_data(app, cparams):
"""
概要: データファイルを読み込み、目的変数と記述子を準備します。
詳細説明:
`read_data_file` を使用して指定されたデータファイルを読み込み、
`split_raw_data` を使用して目的変数と記述子に分割します。
データフレームからNaN(欠損値)を含む行を削除し、NumPy配列に変換します。
目的変数はその最適化モード(最小化、目標値)に応じて変換(例: 最小化の場合は符号反転、目標値最適化の場合は二乗誤差)されます。
処理されたデータは `tkParams` オブジェクトとして、訓練データと全データに分けて返されます。
:param app: アプリケーションオブジェクト (出力に使用)。
:type app: tkApplication
:param cparams: パラメータオブジェクト (`infile` を含む)。
:type cparams: tkParams
:returns: 元のDataFrame、目的変数パラメータ、記述子パラメータ、全データパラメータ、訓練データパラメータのタプル。
ファイルの読み込みに失敗した場合は `None` が返されます。
:rtype: tuple[pd.DataFrame | None, tkParams | None, tkParams | None, tkParams | None, tkParams | None]
"""
print("")
print(f"Read data from [{cparams.infile}]")
df_original = read_data_file(app, cparams.infile)
if df_original is None:
return None, None, None, None, None
targets_params, descriptors = split_raw_data(df_original)
ntargets = len(targets_params.labels0)
print( " descriptors:", descriptors.labels0)
# print(f" 343 ntarget={ntargets}")
print( " objective functions:")
for i in range(ntargets):
if targets_params.values[i] is None:
print(f" i={i} Pareto plot index={targets_params.indexes[i]} label={targets_params.labels[i]} mode={targets_params.modes[i]}")
else:
print(f" i={i} Pareto plot index={targets_params.indexes[i]} label={targets_params.labels[i]} mode={targets_params.modes[i]} value={targets_params.values[i]}")
t_all = df_original[targets_params.labels0]
x_all = df_original[descriptors.labels0]
# print(f"{t_all=}")
# print(f"{x_all=}")
# exit()
# target functionがnanでないデータを学習データとして抽出
df2 = df_original.dropna(how = 'any')
idx_train = df2.index.to_numpy()
# df2 = df2.reset_index()
t_train = df2[targets_params.labels0]
x_train = df2[descriptors.labels0]
# print(f"{t_train=}")
# print(f"{x_train=}")
ndata = len(df_original.index)
columns = df_original.columns.to_list()
index = df_original.index.to_list()
# ncol = len(columns)
# descriptorにnanがあるデータを削除
drop_idx_all = []
drop_idx_train = []
for idx in index:
if x_all.iloc[idx].isnull().any():
drop_idx_all.append(idx)
if idx in idx_train:
drop_idx_train.append(idx)
# print("drop_idx_train=", drop_idx_train)
# print("drop_idx_all=", drop_idx_all)
# print("t_all=", t_all)
t_all = t_all.drop(index = drop_idx_all, axis = 0)
x_all = x_all.drop(index = drop_idx_all, axis = 0)
t_all = t_all.to_numpy()
x_all = x_all.to_numpy()
t_train = t_train.drop(index = drop_idx_train, axis = 0)
x_train = x_train.drop(index = drop_idx_train, axis = 0)
t_train = t_train.to_numpy()
x_train = x_train.to_numpy()
# ヘッダーの制御コードによって目的関数を変換。もとの目的関数は t_train_org に保存
t_train_org = []
for i in range(ntargets):
t_train_org.append(t_train)
if targets_params.modes[i] == 'min':
for j in range(len(t_train)):
t_train[j][i] = -t_train[j][i]
elif targets_params.modes[i] == 'value':
for j in range(len(t_train)):
t_train[j][i] = -(t_train[j][i] - targets_params.values[i])**2
data_all = tkParams()
data_train = tkParams()
data_all.ndata = ndata
data_all.columns = columns
data_all.indexes = index
data_all.targets = t_all
data_all.descriptors = x_all
data_train.ndata = len(idx_train)
data_train.columns = columns
data_train.indexes = idx_train
data_train.targets = t_train
data_train.descriptors = x_train
return df_original, targets_params, descriptors, data_all, data_train
[ドキュメント]
def execute(app, cparams, wait_by_input = True):
"""
概要: ガウス過程に基づくベイズ最適化の主要な実行ロジックです。
詳細説明:
データファイルの読み込み、記述子の標準化、PHYSBOライブラリを用いたベイズ最適化の実行、
結果の保存、および多様なグラフ(等高線図、パレート図、予測値と獲得関数)の生成を調整します。
ユーザー入力による反復学習の機能を持ち、グラフ上でのクリックイベントによって
データポイントの詳細情報を表示するインタラクティブな機能も提供します。
:param app: アプリケーションオブジェクト。
:type app: tkApplication
:param cparams: アプリケーションのパラメータオブジェクト。
:type cparams: tkParams
:param wait_by_input: グラフ表示後にユーザー入力でプログラムを一時停止するかどうか。
:type wait_by_input: bool
:returns: None
:rtype: None
"""
argv, narg = app.get_argv()
mtime = time.localtime(os.path.getctime(app.script_path))
app.print("")
app.print( "====================================================================================")
app.print(f" {argv[0]}: Perform Bayes optimization based on Gauss process ")
app.print(f" Requires PHYSBO: {physbo_url}")
app.print(f"# {app.script_path} ver {version[0]}.{version[1]}.{version[2]}")
app.print(f"# Modified time: {mtime.tm_year}/{mtime.tm_mon}/{mtime.tm_mday} {mtime.tm_hour}:{mtime.tm_min}:{mtime.tm_sec}")
app.print( "====================================================================================")
cparams.printinf(app)
# descriptors_params: 記述子のパラメータインスタンス
# targts_params: 目的関数のパラメータインスタンス
# ntargets: 目的関数の数
# descriptors : 記述子名 (制御子を含む)
# X_all: 全記述子
# t_all: 全目的関数値
# idx_train : 学習データの、全データ中のindex
# X_train : 学習データの記述子
# t_train_org: 学習データのmin,value変換前の値
# t_train : 学習データのmin,value変換後の値
# targets : 目的関数の元ラベル
# targets_label: 目的関数名(ラベルから制御子を除いたもの)
# targets_mode: min,max,value
# targets_value: 目的関数の値
df_original, targets_params, descriptors_params, data_all_params, data_train_params = load_data(app, cparams)
if df_original is None:
return None
X_all = data_all_params.descriptors
t_all = data_all_params.targets
X_train = data_train_params.descriptors
t_train = data_train_params.targets
idx_train = data_train_params.indexes
descriptors = descriptors_params.labels0
ndescriptors = len(descriptors)
ntargets = len(targets_params.labels0)
targets = targets_params.labels0
targets_label = targets_params.labels
targets_mode = targets_params.modes
targets_value = targets_params.values
while(1):#繰り返し回数を入力で決定
number = input('繰り返し回数を入力>>>')
if number.isdecimal():
number = int(number)
break
for l in range(number):
ndata = len(t_all)
n_traindata = len(idx_train)
n_testdata = ndata - n_traindata
if n_testdata == 0:
print("")
print( "====================================================================================")
print( " At least one data must be test data (blank target function)")
print( " A dummy data is added at top.")
print( "====================================================================================")
print( "")
line0 = X_all[0]
# print("line0=", line0)
X_all = np.insert(X_all, 0, line0, axis = 0)
none_list = [None for d in targets_label]
t_all = np.insert(t_all, 0, none_list, axis = 0)
ndata += 1
n_testdata += 1
# if wait_by_input:
# exit()
# else:
# return
# plot_indexes: 2D(等高線)-3Dグラフにプロットする記述子番号の辞書型変数 ('x', 'y', 'z')
# plot_labels : 2D(等高線)-3Dグラフにプロットする記述子名 (descriptorsから制御子を除いたもの)
# descriptors_label: 全記述子名 (descriptorsから制御子を除いたもの
plot_indexes, plot_labels, descriptors_label = get_plot_descriptors(descriptors.copy())
print("")
print(f"# of all data: {len(t_all)}")
print(f"# of training data: {n_traindata}")
print(f"# of descriptors : {ndescriptors}")
print(f"# of objective variables: {ntargets}")
print(f" Objective variables: {targets_label}")
print(f"# of descriptors: {len(descriptors)}")
print(f" Descriptors: {descriptors_label}")
print(f" Descriptors for plot: {plot_labels}")
# print("X_train=")
# print(X_train)
# print("t_train=")
# print(t_train)
# print("X_all=")
# print(X_all)
# print("t_all=")
# print(t_all)
# policy のセット
print("")
print("Make policy:")
print(" Training data indexes:")
print(idx_train)
print(" descriptors from X_train:")
print(X_train)
print(" descriptors from X_all:")
print(X_all[idx_train])
print(" objective values:")
print(t_train)
def standardize(X, labels):
"""
概要: 記述子データを標準化します。
詳細説明:
訓練データの平均と標準偏差を用いて、与えられた記述子データ `X` を標準化します。
標準化は `(X - mean) / std` の形式で行われます。
もしある記述子の標準偏差が0の場合、その記述子には標準化は適用されず、警告が表示されます。
:param X: 標準化する記述子データのNumPy配列。
:type X: np.ndarray
:param labels: 各記述子のラベルのリスト。
:type labels: list[str]
:returns: 標準化された記述子データ、各記述子の平均値、各記述子の標準偏差のタプル。
:rtype: tuple[np.ndarray, np.ndarray, np.ndarray]
"""
print("")
print("Standardize descriptors:")
X_std = X.copy()
stdX = np.std(X_train, 0)
meanX = np.mean(X_train, 0)
xstd = []
for i in range(len(stdX)):
sigma = stdX[i]
x0 = meanX[i]
if sigma == 0.0:
sigma = 1.0
x0 = 0.0
print(f" Warning: {labels[i]:>10} has no distribution (sigma = 0.0). Standardization not applied")
print(f" {labels[i]:>10}: X(standardized)[i] = (X[i] - {x0:12.4g}) / {sigma:12.4g}")
xstd.append((X.T[i] - x0) / sigma)
X_std= np.array(xstd).T
return X_std, meanX, stdX
if cparams.standardize:
X_std, meanX, stdX = standardize(X_all, descriptors)
else:
X_std = X_all
inf = []
for i in range(ntargets):
print("")
print(f"Objective #{i+1}")
# 2021-05-23 物性研CCMS講習会 本山裕一
# 「ベイズ最適化パッケージ PHYSBOの使い方」 physbo_usage.pdf
t_train1 = t_train.T[i]
policy = physbo.search.discrete.policy(test_X = X_std, initial_data = (idx_train, t_train1))
# policy = physbo.search.discrete.policy(test_X = X_all, initial_data = (idx_train, t_train1))
inf.append({"policy": policy})
# シード値のセット
seed = cparams.get("random_seed", None)
if seed is not None:
seed = pint(seed, defval = None)
if seed is not None:
policy.set_seed(0)
# bayes_searchは、simulator = Noneでは actions が返り、simulatorに関数を渡すと Hisotry object が返ってくる
print("")
print("Start Bayes search:")
actions = policy.bayes_search(max_num_probes = cparams.max_num_probes, simulator = None,
score = cparams.score_mode, interval = cparams.interval, num_rand_basis = cparams.num_rand_basis)
print("")
print("Bayse search:")
print("show_search_results")
physbo.search.utility.show_search_results(policy.history, 10)
# Hisotry objectの取得
res = policy.export_history()
best_fx, best_action = res.export_all_sequence_best_fx()
bayes_x = res.chosen_actions
# x_bayes = X_all[bayes_x]
x_bayes = X_std[bayes_x]
y_bayes = res.fx
# 獲得関数
# score = policy.get_score(mode = "EI", xs = X_all)
score = policy.get_score(mode = "EI", xs = X_std)
# 回帰。事後分布の平均値、分散
mean = policy.get_post_fmean(X_std)
var = policy.get_post_fcov(X_std)
# mean = policy.get_post_fmean(X_all)
# var = policy.get_post_fcov(X_all)
std = np.sqrt(var)
mean_m_sigma = mean - std
mean_p_sigma = mean + std
#print("score=", score)
idx_best = np.argmax(score)
print(" Best candidate :", idx_best, X_all[idx_best], mean[idx_best])
# print(" Best candidate from hisotry:", int(best_action[-1]), X_best, Y_best)
inf[i]["mean"] = mean
inf[i]["var"] = var
inf[i]["std"] =std
inf[i]["mean_m_sigma"] = mean - std
inf[i]["mean_p_sigma"] = mean + std
inf[i]["idx_best"] = idx_best
inf[i]["score"] = score
if cparams.get("outresfile", None) is None or cparams.outresfile == "":
dirname = get_dirname(cparams.infile)
# outresfile = replace_path(cparams.infile, 'search_result{i}.npz', {'i': '{i}'})
cparams.outresfile = app.replace_path(cparams.infile,
template = ["dir", "{filebody}-save{i}.npz"], ext_dict = {'i': '{i}'})
outresfile = cparams.outresfile.format(i = i + 1)
print("")
print(f"Save seach result to [{outresfile}]")
res.save(outresfile)
outfile = cparams.outfile.format(i = i + 1)
print("")
print(f"Save predictions to [{outfile}]")
print("X_all.T=")
print(X_all.T)
zlist = zip(*X_all.T, t_all.T[i], mean, mean_m_sigma, mean_p_sigma)
df = pd.DataFrame(list(zlist),
columns = [*descriptors_params.labels0, targets_params.labels0[i], 'mean', 'mean-std', 'mean+std'])
df.to_excel(outfile)
#=====================================================
# plot
#=====================================================
# TkAggを使うと、自動的に複数のウインドウ位置がずれて表示してくれる
# しかし、TkAggを使うと、plt.pause()のあと、input()でグラフウインドウの制御ができなくなる
if not wait_by_input:
matplotlib.use('TkAgg')
def get_winpos(plt):
"""
概要: 現在のmatplotlibウィンドウの位置とサイズを取得します。
詳細説明:
`wait_by_input` が `True` の場合は常に `None` を返します。
それ以外の場合、matplotlibのウィンドウマネージャーからウィンドウのジオメトリ文字列を取得し、
それを解析して幅、高さ、X座標、Y座標のタプルを整数で返します。
:param plt: matplotlib.pyplotモジュール。
:type plt: module
:returns: ウィンドウの (幅, 高さ, X座標, Y座標) のタプル、または `None`。
:rtype: tuple[int, int, int, int] | None
"""
if wait_by_input:
return None
pos = plt.get_current_fig_manager().window.wm_geometry()
print("pos=", pos[0])
m = re.match(r'(\d+)x(\d+)\+(\d+)\+(\d+)', pos)
if m:
g = m.groups()
return pint(g[0]), pint(g[1]), pint(g[2]), pint(g[3])
return None
def set_winpos(plt, x, y):
"""
概要: matplotlibウィンドウの位置を設定します。
詳細説明:
`wait_by_input` が `True` の場合は何もしません。
それ以外の場合、matplotlibのウィンドウマネージャーを使用して、
指定されたX座標とY座標にウィンドウを移動します。
:param plt: matplotlib.pyplotモジュール。
:type plt: module
:param x: ウィンドウの新しいX座標。
:type x: int
:param y: ウィンドウの新しいY座標。
:type y: int
:returns: None
:rtype: None
"""
if wait_by_input:
return
plt.get_current_fig_manager().window.wm_geometry(f"+{x}+{y}")
nxy = int(sqrt(ntargets) + 0.5)
nx_fig = nxy
ny_fig = nxy
if nx_fig * ny_fig < ntargets:
nx_fig += 1
axis_inf = []
win_pos = None
fig_contour = None
fig_pareto = None
fig_scores = None
print("")
print("Plot")
print(f" ntargets={ntargets} in graphs {nx_fig} x {ny_fig}")
print(f" ndescriptors={ndescriptors}")
if ndescriptors > 1:
##############################################################################
# 記述子が複数ある場合、等高線図を描く
# x軸はidx_x、y軸はidx_yで指定される記述子 X_allT[idx_x]/[idx_y]
##############################################################################
# indexes of descriptors for 2D plots
idx_x = plot_indexes['x']
idx_y = plot_indexes['y']
print(f" indexes for 2D contour plot: idx={idx_x} for x-axis, {idx_y} for y-axis")
# mean図のグラフ枠
fig_contour, axes = plt.subplots(ny_fig, nx_fig, figsize = cparams.figsize)
fig_contour.canvas.manager.set_window_title('contour: mean')
# if ny_fig > 1 or nx_fig > 1:
# fig_contour = [fig_contour]
# std図のグラフ枠
fig_std_contour, axes_std = plt.subplots(ny_fig, nx_fig, figsize = cparams.figsize)
fig_std_contour.canvas.manager.set_window_title('contour: std')
if not isinstance(axes_std, np.ndarray) and type(axes_std) is not list:
# if ny_fig == 1 and nx_fig == 1:
axes_std = [axes_std]
axes = np.reshape(axes, nx_fig * ny_fig)
axes_std = np.reshape(axes_std, nx_fig * ny_fig)
mean_list = []
std_list = []
for i in range(ntargets):
ax1 = axes[i]
ax1_std = axes_std[i]
# if isinstance(ax1_std, np.ndarray):
# ax1_std = ax1_std[0]
ax1.tick_params(labelsize = cparams.fontsize)
ax1_std.tick_params(labelsize = cparams.fontsize)
policy = inf[i]["policy"]
ndata = len(X_all)
ndesc = len(X_all[idx_x])
X_allT = X_all.T
# x1_unique = sorted(set(X_allT[idx_x]))
# x2_unique = sorted(set(X_allT[idx_y]))
# nx = len(x1_unique)
# ny = len(x2_unique)
xmin1 = min(X_allT[idx_x])
xmax1 = max(X_allT[idx_x])
if xmin1 == xmax1:
xmin1, xmax1 = xmin1 - 0.5, xmin1 + 0.5
xmin2 = min(X_allT[idx_y])
xmax2 = max(X_allT[idx_y])
if xmin2 == xmax2:
xmin2, xmax2 = xmin2 - 0.5, xmin2 + 0.5
xstep1 = (xmax1 - xmin1) / (cparams.nx_2D - 1)
xstep2 = (xmax2 - xmin2) / (cparams.ny_2D - 1)
x1_unique = np.arange(xmin1, xmax1 + xstep1 * 0.5, xstep1)
x2_unique = np.arange(xmin2, xmax2 + xstep2 * 0.5, xstep2)
# print("x1_unique=", x1_unique)
# print("x2_unique=", x2_unique)
# 2Dプロット用の記述子の組 X_plot を作成。描画x,y軸用以外の記述子は、最初のデータの記述子の値 X_allT[0] を使う
n_plot = cparams.nx_2D * cparams.ny_2D
X_plot = []
for ix in range(cparams.nx_2D):
for iy in range(cparams.ny_2D):
l = []
for idx in range(len(X_all[0])):
if idx == idx_x:
l.append(x1_unique[ix])
elif idx == idx_y:
l.append(x2_unique[iy])
else:
l.append(X_all[0][idx])
X_plot.append(l)
if cparams.standardize:
X_plot_std, meanX_plat, stdX_plat = standardize(np.array(X_plot), descriptors)
else:
X_plot_std = np.array(X_plot)
mean = policy.get_post_fmean(X_plot_std)
var = policy.get_post_fcov(X_plot_std)
std = np.sqrt(var)
mean_list.append(mean)
std_list.append(std)
mean_plot = np.zeros([cparams.nx_2D, cparams.ny_2D])
c = 0
index_mean = []
for ix in range(cparams.nx_2D):
for iy in range(cparams.ny_2D):
mean_plot[ix][iy] = mean[c]
index_mean.append(c)
c += 1
std_plot = np.zeros([cparams.nx_2D, cparams.ny_2D])
c = 0
index_std = []
for ix in range(cparams.nx_2D):
for iy in range(cparams.ny_2D):
std_plot[ix][iy] = std[c]
index_std.append(c)
c += 1
# print("len=", len(x1_unique), len(x2_unique), len(mean_plot), len(mean_plot[idx_x]))
# mean値のマップ描画
cont = ax1.contourf(x1_unique, x2_unique, mean_plot.T, cmap = "jet_r")
#複数グラフにcolorbarをつける
# https://bourbaki.biz/fit-colorbar-to-a-graph-on-matplotlib/
div = make_axes_locatable(ax1)
cax = div.append_axes("right", size = "5%", pad = 0.1)
cb_mean = fig_contour.colorbar(cont, cax = cax)
cb_mean.ax.tick_params(labelsize = cparams.fontsize)
ax1.set_title(targets_label[i], fontname="MS Gothic")
ax1.set_xlabel(descriptors_label[idx_x], fontsize = cparams.fontsize, fontname="MS Gothic")
ax1.set_ylabel(descriptors_label[idx_y], fontsize = cparams.fontsize, fontname="MS Gothic")
# std値のマップ描画
cont_std = ax1_std.contourf(x1_unique, x2_unique, std_plot.T, cmap = "jet_r")
#複数グラフにcolorbarをつける
# https://bourbaki.biz/fit-colorbar-to-a-graph-on-matplotlib/
div_std = make_axes_locatable(ax1_std)
cax_std = div_std.append_axes("right", size = "5%", pad = 0.1)
cb_std = fig_std_contour.colorbar(cont_std, cax = cax_std)
cb_std.ax.tick_params(labelsize = cparams.fontsize)
ax1_std.set_title(targets_label[i], fontname="MS Gothic")
ax1_std.set_xlabel(descriptors_label[idx_x], fontsize = cparams.fontsize, fontname="MS Gothic")
ax1_std.set_ylabel(descriptors_label[idx_y], fontsize = cparams.fontsize, fontname="MS Gothic")
for j in range(cparams.nx_2D):
ax1.scatter([x1_unique[j]] * cparams.ny_2D, x2_unique, s = 5.0,
marker = 'o', c = 'yellow', alpha = 0.2, linewidth = 0.5, edgecolors = 'black')
ax1_std.scatter([x1_unique[j]] * cparams.ny_2D, x2_unique, s = 5.0,
marker = 'o', c = 'yellow', alpha = 0.2, linewidth = 0.5, edgecolors = 'black')
t = t_train.T[i]
tmin = min(t)
tmax = max(t)
size = (t - tmin) / (tmax - tmin)
# size = log((t - tmin) / (tmax - tmin) * 1000.0 + 1.0)
# ax1.scatter(X_train.T[idx_x], X_train.T[idx_y], s = 300.0,
ax1.scatter(X_train.T[idx_x], X_train.T[idx_y], s = size * 300.0,
marker = '*', c = 'yellow', alpha = 0.5, linewidth = 0.5, edgecolors = 'black')
# tmax = max(abs(t_train.T[i]))
# ax1.scatter(X_train.T[idx_x], X_train.T[idx_y], s = t_train.T[i] / tmax * 500,
# marker = '*', c = 'yellow', alpha = 0.7, linewidth = 0.5, edgecolors = 'black')
ax1_std.scatter(X_train.T[idx_x], X_train.T[idx_y], s = size * 300.0,
marker = '*', c = 'yellow', alpha = 0.5, linewidth = 0.5, edgecolors = 'black')
axis_inf.append({"type": "contour", "label": f"{targets_label[i]}", "axis": ax1,
"x1": x1_unique, "y1": x2_unique,
"x2": X_train.T[idx_x], "y2": X_train.T[idx_y],
"idx": index_mean, "descriptors": X_plot, "target": None,
"mean": mean, "std": std, "score": None})
axis_inf.append({"type": "contour", "label": f"{targets_label[i]}", "axis": ax1_std,
"x1": x1_unique, "y1": x2_unique,
"x2": X_train.T[idx_x], "y2": X_train.T[idx_y],
"idx": index_std, "descriptors": X_plot, "target": None,
"mean": mean, "std": std, "score": None})
plt.tight_layout()
plt.pause(0.1)
# win_pos = get_winpos(plt)
# pos = [win_pos[2] + 10, win_pos[3] + 10]
if ntargets > 1:
# 目的関数が複数ある場合、パレート図を描く
# indexes of objective functions for Pareto plot
idx_Pareto_x = targets_params.indexes[0]
idx_Pareto_y = targets_params.indexes[1]
print(f" indexes for Pareto plot: idx={idx_Pareto_x} for x-axis, {idx_Pareto_y} for y-axis")
fig_pareto, ax2 = plt.subplots(1, 1, figsize = cparams.figsize)
fig_pareto.canvas.manager.set_window_title('Pareto')
ax2.scatter(mean_list[idx_Pareto_x], mean_list[idx_Pareto_y], s = 20.0,
marker = 'o', c = 'red', alpha = 1.0, linewidth = 0.5, edgecolors = 'black')
ax2.scatter(t_train.T[idx_Pareto_x], t_train.T[idx_Pareto_y], s = 300.0,
marker = '*', c = 'blue', alpha = 0.5, linewidth = 0.5, edgecolors = 'black')
ax2.set_xlabel(targets_label[idx_Pareto_x], fontsize = cparams.fontsize, fontname="MS Gothic")
ax2.set_ylabel(targets_label[idx_Pareto_y], fontsize = cparams.fontsize, fontname="MS Gothic")
axis_inf.append({"type": "Pareto", "label": "Pareto", "axis": ax2,
"x1": mean_list[idx_Pareto_x], "y1": mean_list[idx_Pareto_y],
"x2": t_train.T[idx_Pareto_x], "y2": t_train.T[idx_Pareto_y],
"idx": idx_train, "descriptors": X_plot, "target": None,
"mean": mean, "std": std, "score": None})
plt.tight_layout()
# set_winpos(plt, pos[0], pos[1])
# pos = [pos[0] + 10, pos[1] + 10]
plt.pause(0.1)
# 予測値、獲得関数のグラフを描く
fig_scores, axes = plt.subplots(ny_fig, nx_fig, figsize = cparams.figsize)
fig_scores.canvas.manager.set_window_title('prediction and aquisition functions')
axes = np.reshape(axes, nx_fig * ny_fig)
for i in range(ntargets):
t_train1 = t_train.T[i]
mean = inf[i]["mean"]
var = inf[i]["var"]
std = inf[i]["std"]
mean_m_sigma = inf[i]["mean_m_sigma"] #= mean - std
mean_p_sigma = inf[i]["mean_p_sigma"] #= mean + std
score = inf[i]["score"]
idx_best = inf[i]["idx_best"]
ax3 = axes[i]
ax3b = ax3.twinx()
"""
ax1.plot(best_fx, label = 'best action', color = 'black')
ax1.set_xlabel("sequence", fontsize = cparams.fontsize, fontname="MS Gothic")
ax1.set_ylabel("value", fontsize = cparams.fontsize, fontname="MS Gothic")
ax1.tick_params(labelsize = cparams.fontsize)
ax1.legend(fontsize = cparams.legend_fontsize, loc = 'best')
"""
x = range(len(X_all))
ins1 = ax3.plot(idx_train, t_train1, label = 'training', linestyle = '', marker = 'o') # markersize=10)
ins3 = ax3.plot(x, mean, label = 'mean', color = 'black', linewidth = 0.5)
# ax3.plot(x, mean + std, color = 'blue', linewidth = 0.3)
# ax3.plot(x, mean - std, color = 'blue', linewidth = 0.3)
ins6 = ax3b.plot(x, score, label = f'score {cparams.score_mode}', color = 'red', linestyle = 'dashed', linewidth = 0.5)
ins7 = ax3.plot(idx_best, mean[idx_best], label = 'best candidate', linestyle = '', marker = '*', markersize = 10.0)
ax3.fill_between(x, mean_m_sigma, mean_p_sigma, color='b', alpha=.1)
if targets_mode[i] == 'value':
ax3.plot(ax3.get_xlim(), [0.0, 0.0], linestyle = 'dashed', linewidth = 0.5, color = 'red')
# axis_inf.append({"label": f"{targets_label[i]}_L", "axis": ax3, "x1": idx_train, "y1": t_train1, "x2": x, "y2": mean})
axis_inf.append({"type": "score", "label": f"{targets_label[i]}_R", "axis": ax3b,
"x1": x, "y1": score,
"x2": None, "y2": None,
"idx": idx_train, "descriptors": X_all, "target": t_all.T[i],
"mean": mean, "std": std, "score": score})
ax3.minorticks_on()
# ax3.set_xticklabels(range(ndata))
ax3.grid(which = "major", axis = "x", color = "green", alpha = 0.5, linestyle = '--', linewidth = 0.5)
ax3.grid(which = "minor", axis = "x", color = "green", alpha = 0.5, linestyle = '--', linewidth = 0.1)
ax3.set_xlabel( "index", fontsize = cparams.fontsize, fontname="MS Gothic")
ax3.set_ylabel(targets_label[i], fontsize = cparams.fontsize, fontname="MS Gothic")
ax3b.set_ylabel(f"score {cparams.score_mode}", fontsize = cparams.fontsize)
# ax3b.set_yscale('log')
ax3.tick_params( labelsize = cparams.fontsize)
ax3b.tick_params(labelsize = cparams.fontsize)
ins = ins1 + ins3 + ins6 + ins7
ax3.legend(ins, [l.get_label() for l in ins], fontsize = cparams.legend_fontsize, loc = 'best')
# ax1.set_ylim(ax3.get_ylim())
plt.tight_layout()
# set_winpos(plt, pos[0], pos[1])
# pos = [pos[0] + 10, pos[1] + 10]
plt.pause(0.1)
for i in range(ntargets):
add_data = []
idx_best = inf[i]["idx_best"]
for k in range(ntargets):
add_data = np.append(add_data,(inf[k]["mean"])[idx_best])
print(add_data)
#予想データを訓練データに挿入(1列×変数の数分の行)
t_train = np.insert(t_train,len(idx_train), add_data, axis=0)
#予想データの記述子を訓練データの記述子に追加
X_train = np.insert(X_train,len(idx_train), X_all[idx_best],axis=0)
#予想データのインデックスを追加
idx_train = np.append(idx_train, idx_best)
print(X_train)
print(t_train)
print(idx_train)
l = +1
# マウスクリック時に最近接データの情報をコンソールに表示
hit_itarget = None
hit_axisinf = None
# マウスクリック時に最近接データを検索
def find_nearest_data(type, x, y, xlist, ylist, idx_list, x2list, y2list):
"""
概要: クリックされた座標に最も近いデータポイントを見つけます。
詳細説明:
グラフ上でクリックされた (x, y) 座標に基づいて、指定されたデータリスト `xlist`, `ylist`
(およびオプションで `x2list`, `y2list`)から最も近いデータポイントを検索します。
検索はプロットの `type` (score, contour, Pareto) に応じて調整されます。
見つかった最も近いデータポイントのインデックスとその二乗距離を返します。
:param type: プロットのタイプ ('score', 'contour', 'Pareto')。
:type type: str
:param x: クリックされたX座標。
:type x: float
:param y: クリックされたY座標。
:type y: float
:param xlist: メインのデータ点のX座標リスト。
:type xlist: list | np.ndarray
:param ylist: メインのデータ点のY座標リスト。
:type ylist: list | np.ndarray
:param idx_list: メインのデータ点に対応するインデックスリスト(`X_plot` のインデックス)。
:type idx_list: list | np.ndarray
:param x2list: セカンダリのデータ点(例: 訓練データ)のX座標リスト。
:type x2list: list | np.ndarray | None
:param y2list: セカンダリのデータ点(例: 訓練データ)のY座標リスト。
:type y2list: list | np.ndarray | None
:returns:
- ihit (int | None): 最も近いメインデータのインデックス。
- minr2 (float): 最も近いメインデータとの二乗距離。
- ihitb (int | None): 最も近いセカンダリデータ(もしあれば)のインデックス。
- minr2b (float): 最も近いセカンダリデータ(もしあれば)との二乗距離。
:rtype: tuple[int | None, float, int | None, float]
"""
minr2 = 1.0e300
ihit = None
minr2b = 1.0e300
ihitb = None
if type == 'score':
for i in range(len(xlist)):
xi = xlist[i]
yi = ylist[i]
r2 = (x - xi)**2 + (y - yi)**2
# print("i=", i, x, y, xi, yi, r2, end = '')
if minr2 > r2:
minr2 = r2
ihit = i
# print(" => ", minr2, ihit)
elif type == 'contour':
c = 0
for xi_val in xlist:
for yi_val in ylist:
r2 = (x - xi_val)**2 + (y - yi_val)**2
# print("x=", c, x, y, xi_val, yi_val, r2, end = '')
if minr2 > r2:
minr2 = r2
ihit = idx_list[c]
# print(" => ", minr2, ihit)
c += 1
for i in range(len(x2list)):
xi = x2list[i]
yi = y2list[i]
r2 = (x - xi)**2 + (y - yi)**2
# print("b: x=", i, x, y, xi, yi, r2, end = '')
if minr2b > r2:
minr2b = r2
ihitb = i
# print(" => ", minr2b, ihitb)
elif type == 'Pareto':
for i in range(len(xlist)):
xi = xlist[i]
yi = ylist[i]
r2 = (x - xi)**2 + (y - yi)**2
# print("x=", i, x, y, xi, yi, r2, end = '')
if minr2 > r2:
minr2 = r2
ihit = i
# print(" => ", minr2, ihit)
for i in range(len(x2list)):
xi = x2list[i]
yi = y2list[i]
r2 = (x - xi)**2 + (y - yi)**2
# print("b: x=", i, x, y, xi, yi, r2, end = '')
if minr2b > r2:
minr2b = r2
ihitb = i
# print(" => ", minr2b, ihitb)
else:
print("")
print(f"Error: Invalid type [{type}] in find_nearest_data()")
exit()
return ihit, minr2, ihitb, minr2b
def onclick(event):
"""
概要: matplotlibグラフ上でのマウスクリックイベントを処理します。
詳細説明:
グラフがクリックされたときに呼び出されます。
クリックされた軸を特定し、その軸上で最も近いデータポイント(予測値または訓練データ)を検索します。
見つかったデータポイントの記述子、予測された目的関数値、標準偏差、スコアなどの情報を
コンソールに表示します。
:param event: matplotlibのMouseEventオブジェクト。クリック座標や関連する軸情報が含まれます。
:type event: matplotlib.backend_bases.MouseEvent
:returns: None
:rtype: None
"""
# マウスクリック時にクリックされたグラフ軸を特定
# print("click", event.inaxes, ax1, ax3, ax3b)
nonlocal hit_itarget, hit_axisinf
for i in range(len(axis_inf)):
# print("a=", event.inaxes, axis_inf[i]["axis"])
if event.inaxes == axis_inf[i]["axis"]:
hit_itarget = i
hit_axisinf = axis_inf[i]
break
else:
return
# 最近接データを取得
xe, ye = event.xdata, event.ydata
idx, r2, idxb, r2b = find_nearest_data(hit_axisinf["type"], xe, ye,
hit_axisinf["x1"], hit_axisinf["y1"], hit_axisinf["idx"],
hit_axisinf["x2"], hit_axisinf["y2"])
# 最近接データの情報をコンソールに表示
app.print("")
# print("descriptors=", hit_axisinf['descriptors'])
# print("descriptors=", hit_axisinf['descriptors'][idx])
if hit_axisinf["type"] == 'score':
app.print(f"clicked in {hit_axisinf['label']} near the data index={idx} / the excel line {idx+2}:")
app.print(f" descriptors = {hit_axisinf['descriptors'][idx]} given objective value = {hit_axisinf['target'][idx]}")
app.print(f" predicted objective value = {hit_axisinf['mean'][idx]:10.4g} +- {hit_axisinf['std'][idx]:10.4g}")
app.print(f" score = {hit_axisinf['score'][idx]:10.4g}")
elif hit_axisinf["type"] == 'contour':
# indexes of descriptors for 2D plots
idx_x = plot_indexes['x']
idx_y = plot_indexes['y']
app.print(f"clicked in {hit_axisinf['label']} at the data index={idx} near {descriptors[idx_x]}={xe:10.4g} and {descriptors[idx_y]}={ye:10.4g}")
app.print(f" descriptors = {hit_axisinf['descriptors'][idx]}")
app.print(f" predicted objective value = {hit_axisinf['mean'][idx]:10.4g} +- {hit_axisinf['std'][idx]:10.4g}")
app.print(f" Nearest training data:")
app.print(f" descriptors = {X_train[idxb]} objective value={t_train[idxb]}")
elif hit_axisinf["type"] == 'Pareto':
# indexes of objective functions for Pareto plot
idx_Pareto_x = targets_params.indexes[0]
idx_Pareto_y = targets_params.indexes[1]
app.print(f"clicked in {hit_axisinf['label']} at the data index={idx} near {targets_label[idx_Pareto_x]}={xe:10.4g} and {targets_label[idx_Pareto_y]}={ye:10.4g}")
app.print(f" descriptors = {hit_axisinf['descriptors'][idx]}")
app.print(f" predicted objective values:")
for i in range(ntargets):
app.print(f" #{i+1} for {targets_label[i]}: {mean_list[i][idx]:10.4g} +- {std_list[i][idx]:10.4g}")
app.print(f" Nearest training data:")
app.print(f" descriptors = {X_train[idxb]} objective value={t_train[idxb]}")
else:
print("")
print(f"Error: Invalid type [{type}] in onclick()")
exit()
if fig_contour:
fig_contour.canvas.mpl_connect("button_press_event", onclick)
if fig_pareto:
fig_pareto.canvas.mpl_connect("button_press_event", onclick)
if fig_scores:
fig_scores.canvas.mpl_connect("button_press_event", onclick)
# fig_scores.canvas.mpl_connect("motion_notify_event", hover)
print("")
usage(app)
if wait_by_input:
print("")
print("Press ENTER to terminate")
input()
if __name__ == "__main__":
"""
概要: スクリプトが直接実行されたときのエントリポイントです。
詳細説明:
`initialize` 関数を呼び出してアプリケーションとパラメータを初期化し、
`update_vars` 関数でコマンドライン引数に基づいてパラメータを更新します。
その後、ベイズ最適化の主要なロジックを含む `execute` 関数を実行し、
最後に `terminate` 関数でプログラムを終了します。
"""
app, cparams = initialize()
update_vars(app, cparams)
execute(app, cparams)
terminate(app, "", usage = None)