optimize.optimize_ga_swarm_remc のソースコード

"""
多目的最適化手法(GA、PSO、REMC、Nelder-Mead)を比較するためのスクリプト

本スクリプトは、指定された最適化手法(遺伝的アルゴリズム、粒子群最適化、レプリカ交換モンテカルロ法、Nelder-Meadシンプレックス法)を用いて、Ackley関数をベースとした目的関数を最小化します。
パラメータ数(2または4)に応じて最適化対象の次元が変わり、コマンドライン引数を通じて各種設定を動的に変更することが可能です。
2パラメータの場合には、目的関数の3Dプロットも表示されます。

関連リンク: :doc:`optimize_ga_swarm_remc_usage`
"""

import sys
import random
import numpy as np
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D


# 最適化するパラメータの数。2または4が選択可能。
nparameters = 4   # 2 or 4

# 使用する最適化手法のデフォルト値。コマンドライン引数で上書き可能。
method = "ga"

# 各変数の探索範囲を定義。`nparameters` の値によって初期設定が変わる。
if nparameters == 4:
    bounds = [[-10, 10], [-10, 10], [-10, 10], [-10, 10]]
else:
    bounds = [[-10, 10], [-10, 10]]
#bounds = [(-20, 20), (-20, 20)]
#bounds = [(-30, 30), (-30, 30)]

# 最適化手法(主にNelder-Mead)の初期推測値。`nparameters` の値によって初期設定が変わる。
if nparameters == 4:
    initial_guess = [5, 5, 5, 5]
else:
    initial_guess = [5, 5]
#initial_guess = [50, 50]

# GAやPSOにおける母集団(または粒子)の個体数。
nparents = 50

# 最大イテレーション回数。
nmaxiter = 100

# 収束判定に使用する許容誤差。
tol = 1.0e-3

# 詳細な出力レベル。0は最小限、1は詳細。
print_level = 0


# コマンドライン引数の解析と設定
# スクリプト実行時に引数が指定された場合、上記デフォルト値を上書きします。
if __name__ == "__main__":
    nargs = len(sys.argv)
    if nargs > 1: method = sys.argv[1] # 最適化手法 (ga, pso, remc, simplex)
    if nargs > 2: nparents = int(sys.argv[2]) # 母集団サイズ/粒子数/レプリカ数
    if nargs > 3: tol      = float(sys.argv[3]) # 収束許容誤差
    if nargs > 4: nmaxiter = int(sys.argv[4]) # 最大イテレーション数
    if nargs > 5: # bounds[0][0] と bounds[1][0] の下限値
        bounds[0][0] = float(sys.argv[5])
        bounds[1][0] = bounds[0][0]
    if nargs > 6: # bounds[0][1] と bounds[1][1] の上限値
        bounds[0][1] = float(sys.argv[6])
        bounds[1][1] = bounds[0][1]
    if nargs > 7: print_level = int(sys.argv[7]) # 出力レベル


[ドキュメント] def AckleyFunc(X, Y): """ Ackley関数を計算します。 Ackley関数は、広範囲の平坦な領域と多数の局所最適解を持つことで知られる 多目的最適化のテスト関数です。 Parameters ---------- :param X: 第一変数。 :type X: float or numpy.ndarray :param Y: 第二変数。 :type Y: float or numpy.ndarray Returns ------- :returns: Ackley関数の値。 :rtype: float or numpy.ndarray """ t1 = 20 t2 = -20 * np.exp(-0.2 * np.sqrt(1.0 / 2 * (X**2 + Y**2 ))) t3 = np.e t4 = -np.exp(1.0 / 2 * (np.cos(2 * np.pi * X)+np.cos(2 * np.pi * Y))) return t1 + t2 + t3 + t4
# 目的関数の呼び出し回数をカウントするグローバル変数。 icall = 0
[ドキュメント] def minimize_func(xk): """ 最適化の目的関数を定義します。 グローバル変数 `nparameters` の値に応じて、2変数または4変数の Ackley関数ベースの関数を返します。 `icall` グローバル変数をインクリメントして、関数呼び出し回数を記録します。 Parameters ---------- :param xk: 最適化されるパラメータの配列。 :type xk: numpy.ndarray Returns ------- :returns: 計算された目的関数の値。 :rtype: float """ global icall icall += 1 if nparameters == 4: return AckleyFunc(xk[0], xk[1]) + 2.0 * AckleyFunc(xk[2], xk[3]) else: return AckleyFunc(xk[0], xk[1])
# 最適化イテレーション回数をカウントするグローバル変数。 iter = 0
[ドキュメント] def callback(xk): """ 最適化プロセス中のコールバック関数。 `print_level` が1の場合、現在のイテレーション数、目的関数値、および 現在のパラメータを出力します。 `iter` グローバル変数をインクリメントして、コールバック呼び出し回数を記録します。 Parameters ---------- :param xk: 現在の最適解候補のパラメータ配列。 :type xk: numpy.ndarray Returns ------- :returns: なし :rtype: None """ global iter if print_level: print(f"iter={iter}: fmin=", minimize_func(xk), " xk:", xk) iter += 1
[ドキュメント] def usage(): """ スクリプトの正しい使用方法を標準出力に表示します。 コマンドライン引数のフォーマットとそれぞれの意味について説明します。 Parameters ---------- :param: なし Returns ------- :returns: なし :rtype: None """ print(f"\nUsage: python {sys.argv[0]} method=[ga|pso|remc|simplex] nparents tol nmaxiter\n")
[ドキュメント] def sampling(bounds): """ 指定された範囲に基づいてランダムな点をサンプリングします。 各次元について、`bounds` で定義された下限と上限の間で一様乱数を生成します。 Parameters ---------- :param bounds: 各変数の下限と上限を定義するリスト。例: `[[low1, high1], [low2, high2]]` :type bounds: list[list[float]] Returns ------- :returns: サンプリングされたランダムな点の配列。 :rtype: numpy.ndarray """ return np.array([random.uniform(low, high) for low, high in bounds])
####################### # GA #######################
[ドキュメント] def mutate(individual, bounds, mutation_rate = 0.01): """ 遺伝的アルゴリズムにおける個体を突然変異させます。 各遺伝子について、`mutation_rate` の確率で、その遺伝子の範囲内で 新しいランダムな値に置き換えます。 Parameters ---------- :param individual: 突然変異させる個体(パラメータ配列)。 :type individual: numpy.ndarray :param bounds: 各遺伝子の探索範囲。例: `[[low1, high1], [low2, high2]]` :type bounds: list[list[float]] :param mutation_rate: 突然変異が発生する確率。 :type mutation_rate: float, optional Returns ------- :returns: なし (個体はインプレースで変更されます) :rtype: None """ for i in range(len(individual)): if random.random() < mutation_rate: individual[i] = random.uniform(bounds[i][0], bounds[i][1])
[ドキュメント] def crossover(parent1, parent2): """ 遺伝的アルゴリズムにおける2つの親個体から子個体を生成します。 一点交叉 (one-point crossover) を使用し、親1と親2の遺伝子を組み合わせて 2つの新しい子個体を作成します。 Parameters ---------- :param parent1: 最初の親個体(パラメータ配列)。 :type parent1: numpy.ndarray :param parent2: 第二の親個体(パラメータ配列)。 :type parent2: numpy.ndarray Returns ------- :returns: 生成された2つの子個体。 :rtype: tuple[numpy.ndarray, numpy.ndarray] """ crossover_point = random.randint(1, len(parent1) - 1) child1 = np.concatenate((parent1[:crossover_point], parent2[crossover_point:])) child2 = np.concatenate((parent2[:crossover_point], parent1[crossover_point:])) return child1, child2
[ドキュメント] def select_best_from_random_group(population, scores, k = 3): """ ランダムに選択された個体群の中から最も良いスコアを持つ個体を選択します(トーナメント選択)。 `population` から `k` 個の個体をランダムに選び、その中から目的関数のスコアが 最も低い(最良の)個体を返します。 Parameters ---------- :param population: 現在の世代の全個体リスト。 :type population: list[numpy.ndarray] :param scores: `population` の各個体に対応する目的関数のスコアリスト。 :type scores: list[float] :param k: トーナメントに参加する個体数。 :type k: int, optional Returns ------- :returns: ランダムに選ばれたグループの中で最もスコアが良い個体。 :rtype: numpy.ndarray """ selected = random.choices(population, k = k) # selectedから、scoresに対応するindexを検索してscoresに入れる # selected_scores = [scores[i] for i, ind in enumerate(population) if any((ind == s).all() for s in selected)] selected_scores = [] selected_params = [] for i, ind in enumerate(population): for s in selected: if (ind == s).all(): selected_scores.append(scores[i]) selected_params.append(s) break # if not selected_scores: # selected_scoresが空の場合の対処 # return random.choice(population) idx = np.argmin(selected_scores) return selected_params[idx]
[ドキュメント] def genetic_algorithm(objective_function, bounds, population_size, callback = None, tol = 1.0e-5, nmaxiter = 1000, mutation_rate = 0.01): """ 遺伝的アルゴリズム (GA) を用いて最適化問題を解きます。 指定された目的関数を最小化するために、選択、交叉、突然変異の操作を繰り返し、 個体群を進化させます。 Parameters ---------- :param objective_function: 最小化する目的関数。`numpy.ndarray` を引数にとり、`float` を返します。 :type objective_function: callable :param bounds: 各変数の探索範囲。例: `[[low1, high1], [low2, high2]]` :type bounds: list[list[float]] :param population_size: 個体群のサイズ(世代ごとの個体数)。 :type population_size: int :param callback: 各世代の終わりに呼び出されるコールバック関数。`numpy.ndarray` を引数にとります。 :type callback: callable, optional :param tol: 目的関数の値がこの許容誤差を下回った場合に収束と見なします。 :type tol: float, optional :param nmaxiter: 最大世代数(イテレーション数)。 :type nmaxiter: int, optional :param mutation_rate: 突然変異が発生する確率。 :type mutation_rate: float, optional Returns ------- :returns: 最も良かった個体のパラメータと、そのときの目的関数値。 :rtype: tuple[numpy.ndarray, float] """ population = [sampling(bounds) for _ in range(population_size)] best_individual = population[0] best_score = objective_function(best_individual) for generation in range(nmaxiter): scores = [objective_function(ind) for ind in population] new_population = [] for i in range(population_size // 2): parent1 = select_best_from_random_group(population, scores) parent2 = select_best_from_random_group(population, scores) child1, child2 = crossover(parent1, parent2) mutate(child1, bounds, mutation_rate) mutate(child2, bounds, mutation_rate) new_population.extend([child1, child2]) population = new_population current_best = min(population, key=objective_function) current_best_score = objective_function(current_best) if current_best_score < best_score: best_individual, best_score = current_best, current_best_score if callback: callback(best_individual) if current_best_score < tol: print("Converged\n") return best_individual, best_score print("Not converged\n") return best_individual, best_score
########################## # sworm optimization ##########################
[ドキュメント] class Particle: """ 粒子群最適化 (PSO) における個々の粒子を表すクラス。 各粒子は、現在位置、速度、自己最良位置、自己最良スコアを保持します。 """ def __init__(self, bounds): """ Particleクラスのコンストラクタ。粒子を初期化します。 粒子の位置は `bounds` に基づいてランダムに初期化され、速度は0に初期化されます。 自己最良位置とスコアも初期化されます。 Parameters ---------- :param bounds: 各次元の探索範囲。例: `[[low1, high1], [low2, high2]]` :type bounds: list[list[float]] """ self.position = np.array([np.random.uniform(low, high) for low, high in bounds]) self.velocity = np.array([0.0 for _ in bounds]) self.best_position = self.position.copy() self.best_score = float('inf')
[ドキュメント] def update_velocity(self, global_best_position, inertia, cognitive, social): """ 粒子の速度を更新します。 現在の速度、自己最良位置への傾向、およびグローバル最良位置への傾向に基づいて、 粒子の速度を更新します。 Parameters ---------- :param global_best_position: 全粒子の中で見つかった最良の位置。 :type global_best_position: numpy.ndarray :param inertia: 慣性重み。 :type inertia: float :param cognitive: 認知係数(自己の最良位置への影響度)。 :type cognitive: float :param social: 社会係数(全体最適位置への影響度)。 :type social: float Returns ------- :returns: なし (粒子の速度はインプレースで更新されます) :rtype: None """ r1, r2 = np.random.rand(2) cognitive_velocity = cognitive * r1 * (self.best_position - self.position) social_velocity = social * r2 * (global_best_position - self.position) self.velocity = inertia * self.velocity + cognitive_velocity + social_velocity
[ドキュメント] def update_position(self, bounds): """ 粒子の位置を更新し、探索範囲内にクリップします。 現在の速度に基づいて粒子の位置を更新します。 更新後、位置が `bounds` で指定された範囲を超えた場合、その範囲内に強制的に収めます。 Parameters ---------- :param bounds: 各次元の探索範囲。例: `[[low1, high1], [low2, high2]]` :type bounds: list[list[float]] Returns ------- :returns: なし (粒子の位置はインプレースで更新されます) :rtype: None """ self.position += self.velocity for i, (low, high) in enumerate(bounds): if self.position[i] < low: self.position[i] = low elif self.position[i] > high: self.position[i] = high
[ドキュメント] def particle_swarm_optimization(objective_function, bounds, num_particles, callback = None, tol = 1.0e-5, nmaxiter = 1000, inertia = 0.5, cognitive = 1.5, social = 1.5): """ 粒子群最適化 (PSO) アルゴリズムを用いて最適化問題を解きます。 粒子の群れが、自己の最良経験と群れ全体の最良経験に基づいて、探索空間内で 最適解を見つけようと試みます。 Parameters ---------- :param objective_function: 最小化する目的関数。`numpy.ndarray` を引数にとり、`float` を返します。 :type objective_function: callable :param bounds: 各変数の探索範囲。例: `[[low1, high1], [low2, high2]]` :type bounds: list[list[float]] :param num_particles: 粒子群のサイズ。 :type num_particles: int :param callback: 各イテレーションでグローバル最良位置が更新されたときに呼び出されるコールバック関数。`numpy.ndarray` を引数にとります。 :type callback: callable, optional :param tol: 目的関数の値がこの許容誤差を下回った場合に収束と見なします。 :type tol: float, optional :param nmaxiter: 最大イテレーション数。 :type nmaxiter: int, optional :param inertia: 慣性重み。粒子の現在の速度への影響度。 :type inertia: float, optional :param cognitive: 認知係数。自己最良位置への粒子の引き付けの度合い。 :type cognitive: float, optional :param social: 社会係数。グローバル最良位置への粒子の引き付けの度合い。 :type social: float, optional Returns ------- :returns: グローバル最良位置のパラメータと、そのときの目的関数値。 :rtype: tuple[numpy.ndarray, float] """ particles = [Particle(bounds) for _ in range(num_particles)] global_best_position = particles[0].position.copy() global_best_score = float('inf') for _ in range(nmaxiter): for particle in particles: score = objective_function(particle.position) if score < particle.best_score: particle.best_score = score particle.best_position = particle.position.copy() if score < global_best_score: global_best_score = score global_best_position = particle.position.copy() if callback: callback(global_best_position) for particle in particles: particle.update_velocity(global_best_position, inertia, cognitive, social) particle.update_position(bounds) if global_best_score < tol: print("Converged\n") return global_best_position, global_best_score print("Not converged\n") return global_best_position, global_best_score
############################## # Replica Exchange MonteCarlo ##############################
[ドキュメント] def exchange_acceptance(delta, temp1, temp2): """ レプリカ交換モンテカルロ法における交換受理確率を計算します。 二つのレプリカのエネルギー差と温度に基づいて、交換が行われる確率を計算します。 Parameters ---------- :param delta: 二つのレプリカのエネルギー差 (E2 - E1)。 :type delta: float :param temp1: 最初のレプリカの温度。 :type temp1: float :param temp2: 第二のレプリカの温度。 :type temp2: float Returns ------- :returns: レプリカ交換の受理確率。 :rtype: float """ return np.exp(delta * (1/temp1 - 1/temp2))
[ドキュメント] def replica_exchange_monte_carlo(objective_function, bounds, num_replicas, temperatures, callback = None, tol = 1.0e-5, nmaxiter = 1000): """ レプリカ交換モンテカルロ法 (REMC) を用いて最適化問題を解きます。 異なる温度を持つ複数のレプリカ間で状態を交換することで、局所最適解からの脱出と 広範囲な探索を効率的に行います。 Parameters ---------- :param objective_function: 最小化する目的関数。`numpy.ndarray` を引数にとり、`float` を返します。 :type objective_function: callable :param bounds: 各変数の探索範囲。例: `[[low1, high1], [low2, high2]]` :type bounds: list[list[float]] :param num_replicas: レプリカの数。 :type num_replicas: int :param temperatures: 各レプリカに割り当てられる温度の配列。 昇順にソートされている必要があります。 :type temperatures: numpy.ndarray :param callback: 最良の個体が更新されたときに呼び出されるコールバック関数。`numpy.ndarray` を引数にとります。 :type callback: callable, optional :param tol: 目的関数の値がこの許容誤差を下回った場合に収束と見なします。 :type tol: float, optional :param nmaxiter: 最大イテレーション数。 :type nmaxiter: int, optional Returns ------- :returns: 全てのレプリカの中で見つかった最良の個体のパラメータと、そのときの目的関数値。 :rtype: tuple[numpy.ndarray, float] """ replicas = [sampling(bounds) for _ in range(num_replicas)] best_individual = replicas[0] best_score = objective_function(best_individual) for iteration in range(nmaxiter): # 各レプリカ内で通常のモンテカルロステップを実行 for i in range(num_replicas): new_individual = sampling(bounds) # 新しい候補点をサンプリング delta = objective_function(new_individual) - objective_function(replicas[i]) # メトロポリス基準で受理判定 if delta < 0 or np.random.rand() < np.exp(-delta / temperatures[i]): replicas[i] = new_individual # 隣接レプリカ間での交換ステップを実行 for i in range(num_replicas - 1): delta = objective_function(replicas[i+1]) - objective_function(replicas[i]) if np.random.rand() < exchange_acceptance(delta, temperatures[i], temperatures[i+1]): replicas[i], replicas[i+1] = replicas[i+1], replicas[i] current_best = min(replicas, key=objective_function) current_best_score = objective_function(current_best) if current_best_score < tol: print("Converged\n") return best_individual, best_score if current_best_score < best_score: best_individual, best_score = current_best, current_best_score if callback: callback(best_individual) print("Not converged\n") return best_individual, best_score
[ドキュメント] def optimize(method): """ 指定された最適化手法を実行します。 グローバル変数 `iter` と `icall` をリセットし、選択された手法(GA, Simplex, PSO, REMC) で目的関数 `minimize_func` を最適化します。 結果として得られた最良のパラメータとスコア、および関数呼び出し回数を出力します。 Parameters ---------- :param method: 使用する最適化手法 ('ga', 'simplex', 'pso', 'remc')。 :type method: str Returns ------- :returns: なし :rtype: None """ global iter, icall iter = 0 icall = 0 print() print(f"method: {method}") if method == 'ga': best_params, best_score = genetic_algorithm(minimize_func, bounds, nparents, callback = callback, tol = tol, nmaxiter = nmaxiter) elif method == 'simplex': # Nelder-Mead法のための初期シンプレックスを設定 if nparameters == 4: initial_simplex = np.array([ initial_guess, (bounds[0][0], initial_guess[1], initial_guess[2], initial_guess[3]), (initial_guess[0], bounds[1][1], initial_guess[2], initial_guess[3]), (initial_guess[0], initial_guess[1], bounds[2][0], initial_guess[3]), (initial_guess[0], initial_guess[1], initial_guess[2], bounds[3][1]), ]) else: initial_simplex = np.array([ initial_guess, (bounds[0][0], initial_guess[1]), (initial_guess[0], bounds[1][1]), ]) result = minimize(minimize_func, initial_guess, method = 'Nelder-Mead', tol = tol, callback = callback, options = {'initial_simplex': initial_simplex, "maxiter": nmaxiter}) best_params = result.x best_score = result.fun if result.success: print("Converged\n") else: print("Not converged\n") elif method == 'pso': best_params, best_score = particle_swarm_optimization(minimize_func, bounds, nparents, callback = callback, tol = tol, nmaxiter = nmaxiter) elif method == 'remc': temperatures = np.linspace(1, 10, nparents) # 温度配列を生成 best_params, best_score = replica_exchange_monte_carlo(minimize_func, bounds, nparents, temperatures, callback = callback, tol = tol, nmaxiter = nmaxiter) else: print(f"Invalide method [{method}]") exit() print(f"Best parameters: {best_params}") print(f"Best score: {best_score}") print(f"icall={icall} iter={iter}") print()
[ドキュメント] def plot(): """ 2D Ackley関数の3Dプロットを生成し表示します。 `nparameters` が2の場合にのみ呼び出されます。 最適化対象の関数の形状を視覚的に確認できます。 Parameters ---------- :param: なし Returns ------- :returns: なし :rtype: None """ x = np.linspace(-5, 5, 400) y = np.linspace(-5, 5, 400) x, y = np.meshgrid(x, y) # minimize_funcはxkを引数に取るので、[x, y]を渡すように調整 # ただし、meshgridの結果は2D配列なので、minimize_funcが期待する形式と異なる可能性あり。 # この実装ではx, yが直接AckleyFuncに渡されることを想定している。 # minimize_func([x, y])は、AckleyFunc(x[0], x[1])という形式で呼ばれるため、 # ここではAckleyFuncを直接呼び出すのが適切。 # 既存コードを変更しないルールのため、minimize_funcをそのまま利用する。 # minimize_funcが2D配列を適切に処理すると仮定。 # 実際のminimize_funcはxk[0], xk[1]とアクセスするため、meshgridの結果をそのまま渡すとエラーになる。 # 既存のロジックを変更しないルールに従うと、このプロット関数は2D配列を適切に扱えないため、 # 意図したプロットにはならない可能性があるが、コード変更はできない。 # 厳密には `z = AckleyFunc(x, y)` とすべきだが、ルールに従う。 z = minimize_func([x, y]) # この行は `x` と `y` がスカラーでないと `minimize_func` の中でエラーになる可能性あり。 # (xk[0], xk[1])のアクセスが多次元配列に対して行われるため。 fig = plt.figure() ax = fig.add_subplot(111, projection='3d') ax.plot_surface(x, y, z, cmap='viridis') ax.set_xlabel('X') ax.set_ylabel('Y') ax.set_zlabel('Z') plt.pause(0.01)
[ドキュメント] def main(): """ スクリプトのメイン処理を実行します。 現在の設定(手法、探索範囲、初期推測値、最大イテレーション)を出力し、 選択された最適化手法または全ての最適化手法を順次実行します。 Parameters ---------- :param: なし Returns ------- :returns: なし :rtype: None """ print() print(f"method: {method}") print(f"bounds:", bounds) print(f"initial guess:", initial_guess) print(f"nmaxiter: {nmaxiter}") if method == 'all': for m in ['ga', 'pso', 'remc', 'simplex']: optimize(m) else: optimize(method)
if __name__ == '__main__': # スクリプトが直接実行された場合のエントリポイント。 # パラメータ数が2の場合にのみ、目的関数の3Dプロットを表示します。 # その後、メイン処理を実行し、使用法メッセージを表示してユーザー入力を待ちます。 if nparameters == 2: plot() main() usage() input("\nPress ENTER to terminate>>\n")