r"""
xrd_fit.py: X線回折(XRD)シミュレーションおよびフィッティングツール
このモジュールは、xrayutilitiesライブラリを使用して動的理論に基づいたXRDシミュレーションを実行し、
実験データに対して構造パラメータ(組成、緩和度、膜厚)を最適化します。
また、フリンジ解析による膜厚の自動推定機能も備えています。
関連リンク: :doc:`xrd_fit_usage`
"""
import argparse
import csv
import os
import random
import sys
import traceback
import numpy as np
import matplotlib.pyplot as plt
import xrayutilities as xu
from scipy.optimize import minimize
from scipy.signal import savgol_filter, find_peaks
# ============================================================
# 定数 / デフォルト値
# ============================================================
DEFAULT_MODE = "sim"
DEFAULT_METHOD = "pso"
DEFAULT_INFILE = ""
DEFAULT_FIX = ""
DEFAULT_SUBSTRATE_FILE = "GaN.cif"
DEFAULT_FILM_FILE = "AlN.cif"
DEFAULT_NMAXITER = 1000
DEFAULT_TOL = 1e-7
DEFAULT_YSCALE = "log"
DEFAULT_RESIDUAL_SCALE = "log"
TARGET_X = 0.2
TARGET_RELAX = 0.0
TARGET_THICK = 1500.0
TRY_X = 0.0
TRY_RELAX = 1.0
TRY_THICK = 1500.0
TWOTHETA_MIN = 32.0
TWOTHETA_MAX = 38.0
NPOINTS = 200
RESOLUTION_WIDTH = 0.0001
HKL = (0, 0, 2)
LIMIT = 0.01
MAX_STALL = 5000
NINTERVAL_SAVE = 100
NINTERVAL_PRINT = 10
NINTERVAL_PLOT = 10
XRANGE = 1.0
RRANGE = 1.0
TRANGE = 10.0
PARAMETER_NAMES = ["x", "relax", "thick"]
DEFAULT_PRIMARY_SET = "best0"
DEFAULT_PSO_PREFIX = "pso"
DEFAULT_GUESS_LOW_DEG = 16.0
DEFAULT_GUESS_HIGH_DEG = 17.3
DEFAULT_NSMOOTH_POINTS = 31
DEFAULT_NSMOOTH_ORDER = 3
DEFAULT_NQ_UNIFORM = 4096
DEFAULT_NGUESS_KEEP = 5
DEFAULT_PSO_NPARTICLES = 12
DEFAULT_PSO_W = 0.72
DEFAULT_PSO_C1 = 1.49
DEFAULT_PSO_C2 = 1.49
DEFAULT_CLUSTER_GAP_FACTOR = 1.6
# ============================================================
# argparse
# ============================================================
[ドキュメント]
def build_parser():
"""
コマンドライン引数を解析するためのパーサーを構築します。
詳細説明:
この関数はargparseモジュールを使用して、XRDシミュレーション、フィッティング、
および膜厚推定ツールが受け入れるコマンドライン引数を定義します。
各引数にはデフォルト値、選択肢、ヘルプメッセージが含まれます。
Returns:
argparse.ArgumentParser: 設定済みのパーサーオブジェクト。
"""
parser = argparse.ArgumentParser(
description="動的理論に基づくXRDシミュレーション、フィッティング、および膜厚推定"
)
parser.add_argument(
"--mode",
default=DEFAULT_MODE,
choices=["sim", "read", "fit", "guess"],
help="実行モード: sim (シミュレーション), read (読込), fit (最適化), guess (膜厚推定)",
)
parser.add_argument(
"--method",
default=DEFAULT_METHOD,
choices=["random", "nelder-mead", "bfgs", "cg", "pso"],
help="最適化手法。fitモード時に有効",
)
parser.add_argument(
"--infile",
default=DEFAULT_INFILE,
help='2theta-intensityデータを含むテキストファイルへのパス',
)
parser.add_argument(
"--substrate_file",
default=DEFAULT_SUBSTRATE_FILE,
help=f'基板のCIFファイルパス (デフォルト: {DEFAULT_SUBSTRATE_FILE})',
)
parser.add_argument(
"--film_file",
default=DEFAULT_FILM_FILE,
help=f'膜のCIFファイルパス (デフォルト: {DEFAULT_FILM_FILE})',
)
parser.add_argument(
"--fix",
default=DEFAULT_FIX,
help='固定するパラメータ(カンマ区切り、例: "x,relax")',
)
parser.add_argument(
"--nmaxiter",
type=int,
default=DEFAULT_NMAXITER,
help=f"最大反復回数 (デフォルト: {DEFAULT_NMAXITER})",
)
parser.add_argument(
"--tol",
type=float,
default=DEFAULT_TOL,
help=f"収束判定の許容誤差 (デフォルト: {DEFAULT_TOL})",
)
parser.add_argument(
"--yscale",
default=DEFAULT_YSCALE,
choices=["linear", "log"],
help=f'プロットのY軸スケール (デフォルト: "{DEFAULT_YSCALE}")',
)
parser.add_argument(
"--residual_scale",
default=DEFAULT_RESIDUAL_SCALE,
choices=["linear", "log"],
help=f'残差計算に使用するスケール (デフォルト: "{DEFAULT_RESIDUAL_SCALE}")',
)
parser.add_argument(
"--guess_low",
type=float,
default=DEFAULT_GUESS_LOW_DEG,
help="guessモードでの解析開始角度 [deg]",
)
parser.add_argument(
"--guess_high",
type=float,
default=DEFAULT_GUESS_HIGH_DEG,
help="guessモードでの解析終了角度 [deg]",
)
parser.add_argument(
"--nsmooth_points",
type=int,
default=DEFAULT_NSMOOTH_POINTS,
help="guessモードでの平滑化ウィンドウ点数",
)
parser.add_argument(
"--nsmooth_order",
type=int,
default=DEFAULT_NSMOOTH_ORDER,
help="guessモードでの平滑化多項式次数",
)
parser.add_argument(
"--nguess_keep",
type=int,
default=DEFAULT_NGUESS_KEEP,
help="guessモードで保持する膜厚候補の数",
)
parser.add_argument(
"--cluster_gap_factor",
type=float,
default=DEFAULT_CLUSTER_GAP_FACTOR,
help=f"フリンジクラスター判定用のギャップ係数 (デフォルト: {DEFAULT_CLUSTER_GAP_FACTOR})",
)
parser.add_argument(
"--pso_nparticles",
type=int,
default=DEFAULT_PSO_NPARTICLES,
help=f"PSOの粒子数 (デフォルト: {DEFAULT_PSO_NPARTICLES})",
)
parser.add_argument(
"--pso_w",
type=float,
default=DEFAULT_PSO_W,
help=f"PSOの慣性重み (デフォルト: {DEFAULT_PSO_W})",
)
parser.add_argument(
"--pso_c1",
type=float,
default=DEFAULT_PSO_C1,
help=f"PSOの自己学習係数 (デフォルト: {DEFAULT_PSO_C1})",
)
parser.add_argument(
"--pso_c2",
type=float,
default=DEFAULT_PSO_C2,
help=f"PSOの社会学習係数 (デフォルト: {DEFAULT_PSO_C2})",
)
parser.add_argument(
"--pso_stall_max",
type=int,
default=15,
help="改善がない場合にPSOを停止する最大反復数",
)
parser.add_argument(
"--pso_spread_rtol",
type=float,
default=0.10,
help="粒子の分散に基づく停止条件の相対許容誤差",
)
return parser
[ドキュメント]
def initialize(parser):
"""
コマンドライン引数を解析し、結果を返します。
Parameters:
:param parser: argparse.ArgumentParserオブジェクト。
:returns: argparse.Namespace: 解析された引数を含むオブジェクト。
"""
return parser.parse_args()
# ============================================================
# ファイル名ヘルパー
# ============================================================
[ドキュメント]
def get_parameter_filename(infile):
"""
入力ファイル名に基づいてパラメータ保存用のCSVファイル名を生成します。
詳細説明:
入力ファイル名が存在する場合はそのファイル名から拡張子を除いた名前に "-parameters.csv" を追加します。
入力ファイル名が空の場合は "parameters.csv" を返します。
Parameters:
:param infile: str: 入力データファイル名。
Returns:
str: パラメータCSVファイル名。
"""
if infile == "":
return "parameters.csv"
filebody, _ = os.path.splitext(infile)
return f"{filebody}-parameters.csv"
[ドキュメント]
def get_guess_peaks_filename(infile):
"""
入力ファイル名に基づいて膜厚推定モードで検出したピークの保存用CSVファイル名を生成します。
詳細説明:
入力ファイル名が存在する場合はそのファイル名から拡張子を除いた名前に "-guess-peaks.csv" を追加します。
入力ファイル名が空の場合は "guess-peaks.csv" を返します。
Parameters:
:param infile: str: 入力データファイル名。
Returns:
str: ピークデータCSVファイル名。
"""
if infile == "":
return "guess-peaks.csv"
filebody, _ = os.path.splitext(infile)
return f"{filebody}-guess-peaks.csv"
[ドキュメント]
def get_guess_clusters_filename(infile):
"""
入力ファイル名に基づいて膜厚推定モードで解析したクラスターの保存用CSVファイル名を生成します。
詳細説明:
入力ファイル名が存在する場合はそのファイル名から拡張子を除いた名前に "-guess-clusters.csv" を追加します。
入力ファイル名が空の場合は "guess-clusters.csv" を返します。
Parameters:
:param infile: str: 入力データファイル名。
Returns:
str: クラスターデータCSVファイル名。
"""
if infile == "":
return "guess-clusters.csv"
filebody, _ = os.path.splitext(infile)
return f"{filebody}-guess-clusters.csv"
[ドキュメント]
def parse_fix_list(fix_str):
"""
固定パラメータの文字列をリストに変換します。
詳細説明:
カンマ区切りの文字列を受け取り、各パラメータ名を小文字に変換し、
空白を除去したリストとして返します。空の文字列は空のリストを返します。
Parameters:
:param fix_str: str: カンマ区切りのパラメータ名文字列(例: "x,relax")。
Returns:
list[str]: 小文字に統一された固定するパラメータ名のリスト。
"""
if fix_str.strip() == "":
return []
return [s.strip().lower() for s in fix_str.split(",") if s.strip() != ""]
# ============================================================
# パラメータユーティリティ
# ============================================================
[ドキュメント]
def make_default_parameter_set():
"""
単一のパラメータセット(組成、緩和度、膜厚)の初期辞書を作成します。
詳細説明:
各パラメータには 'value' (現在の値) と 'optid' (最適化対象フラグ: 1=対象, 0=固定) が含まれます。
初期値は定数 `TRY_X`, `TRY_RELAX`, `TRY_THICK` から取得され、
全て最適化対象として設定されます。
Returns:
dict[str, dict[str, float or int]]: 初期値と最適化フラグを含むパラメータ辞書。
例: `{"x": {"value": 0.0, "optid": 1}, ...}`
"""
return {
"x": {"value": TRY_X, "optid": 1},
"relax": {"value": TRY_RELAX, "optid": 1},
"thick": {"value": TRY_THICK, "optid": 1},
}
[ドキュメント]
def clone_parameter_set(param_set):
"""
指定されたパラメータセットを深くコピーします。
詳細説明:
元のパラメータセットの内容を変更せずに新しい独立した辞書を作成するために使用されます。
これにより、最適化プロセス中にパラメータが不意に変更されるのを防ぎます。
Parameters:
:param param_set: dict: コピーするパラメータセット。
Returns:
dict: コピーされたパラメータセット。
"""
out = {}
for name in param_set:
out[name] = {
"value": float(param_set[name]["value"]),
"optid": int(param_set[name]["optid"]),
}
return out
[ドキュメント]
def make_default_parameter_sets():
"""
デフォルトのプライマリパラメータセットを含む、複数のパラメータセットを管理する辞書を作成します。
詳細説明:
`DEFAULT_PRIMARY_SET` (通常 'best0') というキーで、
`make_default_parameter_set()` で生成された初期パラメータセットを保持します。
これは、複数の最適化粒子や候補を管理する際に基盤となる構造を提供します。
Returns:
dict[str, dict]: デフォルトのプライマリセットを含むパラメータ辞書全体の構造。
"""
return {DEFAULT_PRIMARY_SET: make_default_parameter_set()}
[ドキュメント]
def apply_fix_list(param_sets, fix_list):
"""
指定されたパラメータの最適化フラグをオフ(0)に設定します。
詳細説明:
`param_sets` 内の全てのパラメータセットに対して、`fix_list` に含まれるパラメータの
`optid` (最適化対象フラグ) を0に設定し、最適化から除外します。
Parameters:
:param param_sets: dict: 変更対象のパラメータ辞書 (複数のセットを含む)。
:param fix_list: list[str]: 固定するパラメータ名のリスト。
"""
for set_name in param_sets:
for varname in fix_list:
if varname in param_sets[set_name]:
param_sets[set_name][varname]["optid"] = 0
[ドキュメント]
def read_parameters(parameter_file):
"""
CSVファイルからパラメータ設定を読み込みます。ファイルが存在しない場合はデフォルト値を返します。
詳細説明:
指定されたCSVファイルからパラメータ名、値、最適化IDを読み込み、
`make_default_parameter_sets` の形式に沿った辞書構造を構築します。
CSVファイルのヘッダーが不正な場合や、ファイルが空の場合は、警告を出力し
デフォルトのパラメータセットを返します。
Parameters:
:param parameter_file: str: パラメータCSVファイルへのパス。
Returns:
dict[str, dict]: 読み込まれたパラメータセットの辞書。
ファイルが存在しない場合や無効な場合は、デフォルトのパラメータセットが返されます。
"""
param_sets = make_default_parameter_sets()
if not os.path.isfile(parameter_file):
return param_sets
rows = []
with open(parameter_file, "r", newline="", encoding="utf-8") as f:
reader = csv.reader(f)
for row in reader:
rows.append(row)
if len(rows) == 0:
return param_sets
header = rows[0]
if len(header) < 3 or header[0] != "varname" or header[1] != "value" or header[2] != "optid":
print(f"Warning: invalid parameter CSV header in {parameter_file}. Use defaults.")
return param_sets
loaded = {}
for row in rows[1:]:
if len(row) < 3:
continue
varname_raw = row[0].strip()
value_str = row[1].strip()
optid_str = row[2].strip()
if varname_raw == "":
continue
if ":" in varname_raw:
set_name, varname = varname_raw.split(":", 1)
set_name = set_name.strip()
varname = varname.strip()
else:
set_name = DEFAULT_PRIMARY_SET
varname = varname_raw
if set_name == "" or varname == "":
continue
if varname not in PARAMETER_NAMES:
continue
try:
value = float(value_str)
optid = int(optid_str)
except ValueError:
continue
if set_name not in loaded:
loaded[set_name] = make_default_parameter_set()
loaded[set_name][varname]["value"] = value
loaded[set_name][varname]["optid"] = 1 if optid != 0 else 0
if len(loaded) > 0:
param_sets = loaded
if DEFAULT_PRIMARY_SET not in param_sets:
param_sets[DEFAULT_PRIMARY_SET] = make_default_parameter_set()
return param_sets
[ドキュメント]
def save_parameters(parameter_file, param_sets):
"""
現在のパラメータセットをCSVファイルに書き出します。
詳細説明:
`param_sets` 辞書に格納されている全てのパラメータ(複数のセットを含む)を、
"varname", "value", "optid" のヘッダーを持つCSV形式で指定されたファイルに保存します。
各パラメータ名は `set_name:varname` の形式で保存されます。
Parameters:
:param parameter_file: str: パラメータを保存するファイルパス。
:param param_sets: dict[str, dict]: 保存するパラメータデータ。
"""
rows = [["varname", "value", "optid"]]
set_names = sorted(param_sets.keys())
for set_name in set_names:
for varname in PARAMETER_NAMES:
entry = param_sets[set_name][varname]
rows.append(
[
f"{set_name}:{varname}",
f"{entry['value']:.16g}",
str(int(entry["optid"])),
]
)
with open(parameter_file, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerows(rows)
[ドキュメント]
def save_guess_peak_csv(outfile, peak_rows):
"""
膜厚推定モードで検出したピーク情報をCSVファイルに保存します。
Parameters:
:param outfile: str: 保存先ファイルパス。
:param peak_rows: list[dict]: ピーク情報を含む辞書のリスト。
各辞書は `cluster_id`, `n`, `q`, `twotheta`, `used_stage1`, `used_stage2`,
`resid_stage1`, `resid_stage2` のキーを持つと期待されます。
"""
rows = [["cluster_id", "n", "q", "2theta", "used_stage1", "used_stage2", "resid_stage1", "resid_stage2"]]
for row in peak_rows:
rows.append(
[
str(row["cluster_id"]),
str(row["n"]),
f"{row['q']:.16g}",
f"{row['twotheta']:.16g}",
str(int(row["used_stage1"])),
str(int(row["used_stage2"])),
f"{row['resid_stage1']:.16g}",
f"{row['resid_stage2']:.16g}",
]
)
with open(outfile, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerows(rows)
[ドキュメント]
def save_guess_cluster_csv(outfile, cluster_rows):
"""
膜厚推定モードで解析したクラスター(膜厚候補)情報をCSVファイルに保存します。
Parameters:
:param outfile: str: 保存先ファイルパス。
:param cluster_rows: list[dict]: クラスター情報を含む辞書のリスト。
各辞書は `cluster_id`, `npeaks`, `dq_est`, `thick_fft`, `thick_reg`, `confidence`
のキーを持つと期待されます。
"""
rows = [["cluster_id", "npeaks", "dq_est", "thick_fft", "thick_reg", "confidence"]]
for row in cluster_rows:
rows.append(
[
str(row["cluster_id"]),
str(row["npeaks"]),
f"{row['dq_est']:.16g}",
f"{row['thick_fft']:.16g}",
f"{row['thick_reg']:.16g}",
f"{row['confidence']:.16g}",
]
)
with open(outfile, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerows(rows)
[ドキュメント]
def parameter_set_to_values(param_set):
"""
パラメータセット辞書から組成(x)、緩和度(relax)、膜厚(thick)のタプルを抽出します。
Parameters:
:param param_set: dict: パラメータセット辞書。
Returns:
tuple[float, float, float]: (x, relax, thick) の値のタプル。
"""
return (
float(param_set["x"]["value"]),
float(param_set["relax"]["value"]),
float(param_set["thick"]["value"]),
)
[ドキュメント]
def get_optimize_names(param_set):
"""
指定されたパラメータセット内で最適化対象(optid=1)となっているパラメータ名のリストを取得します。
Parameters:
:param param_set: dict: パラメータセット辞書。
Returns:
list[str]: 最適化対象のパラメータ名("x", "relax", "thick")のリスト。
"""
names = []
for name in PARAMETER_NAMES:
if int(param_set[name]["optid"]) != 0:
names.append(name)
return names
[ドキュメント]
def pack_optimize_values(param_set, optimize_names):
"""
最適化対象のパラメータの値をNumPy配列にパックします。
Parameters:
:param param_set: dict: 基準となるパラメータセット。
:param optimize_names: list[str]: 最適化対象のパラメータ名のリスト。
Returns:
numpy.ndarray: 最適化対象のパラメータ値を含むNumPy配列。
"""
return np.array([param_set[name]["value"] for name in optimize_names], dtype=float)
[ドキュメント]
def unpack_optimize_values(base_param_set, optimize_names, xvec):
"""
NumPy配列の値を、指定されたパラメータ名の順序に従って元のパラメータ辞書形式に戻します。
詳細説明:
`base_param_set` をクローンし、`optimize_names` の順序で `xvec` の値で更新します。
これにより、最適化ルーチンから返された値をパラメータセットに適用できます。
Parameters:
:param base_param_set: dict: 基準となるパラメータセット(最適化対象外の値を保持するために使用)。
:param optimize_names: list[str]: 最適化対象のパラメータ名のリスト。
:param xvec: numpy.ndarray: 最適化によって得られたパラメータ値の配列。
Returns:
dict: 更新されたパラメータセット。
"""
work = clone_parameter_set(base_param_set)
for i, name in enumerate(optimize_names):
work[name]["value"] = float(xvec[i])
return work
[ドキュメント]
def set_best_parameter_set(param_sets, param_set):
"""
現在の最良のパラメータセットを `param_sets` 辞書内の `DEFAULT_PRIMARY_SET` キーに保存します。
詳細説明:
これにより、最適化の進捗に合わせて最良の解が常に追跡され、
プログラムの実行終了時や中断時に最新の最良パラメータが保存されます。
Parameters:
:param param_sets: dict: 複数のパラメータセットを管理する辞書。
:param param_set: dict: 最良とみなされる単一のパラメータセット。
"""
param_sets[DEFAULT_PRIMARY_SET] = clone_parameter_set(param_set)
[ドキュメント]
def replace_pso_parameter_sets(param_sets, candidate_sets):
"""
PSOモード用の粒子パラメータセットを更新します。
詳細説明:
`param_sets` からPSO関連の既存のパラメータセットを削除し、
`candidate_sets` から新しいPSO粒子パラメータセットを追加します。
PSOのプレフィックスを持たないセットは保持されます。
Parameters:
:param param_sets: dict: 現在の全てのパラメータセット。
:param candidate_sets: list[dict]: PSOの新しい粒子パラメータセットのリスト。
Returns:
dict: PSO粒子パラメータが更新された新しいパラメータセット辞書。
"""
keep_names = [name for name in param_sets.keys() if not name.startswith(DEFAULT_PSO_PREFIX)]
new_sets = {}
for name in keep_names:
new_sets[name] = clone_parameter_set(param_sets[name])
for i, param_set in enumerate(candidate_sets):
set_name = f"{DEFAULT_PSO_PREFIX}{i}"
new_sets[set_name] = clone_parameter_set(param_set)
if DEFAULT_PRIMARY_SET not in new_sets:
new_sets[DEFAULT_PRIMARY_SET] = make_default_parameter_set()
return new_sets
[ドキュメント]
def get_pso_parameter_sets(param_sets):
"""
`DEFAULT_PSO_PREFIX` (通常 "pso") で始まるキーを持つパラメータセットを全て取得します。
詳細説明:
これはPSOアルゴリズムで使用される各粒子の位置を初期化または取得するために使用されます。
取得されたセットは、元の辞書から独立したコピーとして提供されます。
Parameters:
:param param_sets: dict: 複数のパラメータセットを管理する辞書。
Returns:
list[dict]: PSO粒子として使用されるパラメータセットのリスト。
"""
names = sorted([name for name in param_sets.keys() if name.startswith(DEFAULT_PSO_PREFIX)])
return [clone_parameter_set(param_sets[name]) for name in names]
# ============================================================
# 材料 / モデル準備
# ============================================================
[ドキュメント]
def prepare_materials(substrate_file, film_file):
"""
CIFファイルから基板と膜の材料オブジェクトを準備し、弾性定数を設定します。
詳細説明:
指定されたCIFファイルパスからX-rayUtilitiesの `Crystal` オブジェクトをロードします。
材料がHexagonal対称性を持つことを確認し、もし弾性テンソルが定義されていない場合は
デフォルトの六方晶系弾性定数を割り当てて、歪み計算が続行できるようにします。
ファイルが見つからない場合や読み込みエラーが発生した場合は、プログラムを終了します。
Parameters:
:param substrate_file: str: 基板のCIFファイルパス。
:param film_file: str: 膜のCIFファイルパス。
Returns:
tuple[xrayutilities.materials.Crystal, xrayutilities.materials.Crystal]:
(substrate, film) の材料オブジェクト。
"""
if not os.path.isfile(substrate_file):
print(f"Error: substrate_file not found : {substrate_file}")
sys.exit(1)
if not os.path.isfile(film_file):
print(f"Error: film_file not found : {film_file}")
sys.exit(1)
try:
substrate = xu.materials.Crystal.fromCIF(substrate_file)
except Exception as e:
print(f"Error reading substrate CIF : {substrate_file}")
print(e)
sys.exit(1)
try:
film = xu.materials.Crystal.fromCIF(film_file)
except Exception as e:
print(f"Error reading film CIF : {film_file}")
print(e)
sys.exit(1)
substrate.symm_class_name = "Hexagonal"
film.symm_class_name = "Hexagonal"
def ensure_elastic(material, name, default_cij):
"""
材料オブジェクトに弾性定数が定義されていることを確認し、
存在しない場合はデフォルト値を割り当てます。
"""
has_elastic = True
try:
material.elastic
except Exception:
has_elastic = False
try:
material.cijkl
except Exception:
has_elastic = False
if has_elastic:
return
print()
print(f"Warning: elastic tensor not defined for {name}")
print("Strain calculation may be inaccurate.")
print("Default hexagonal elastic constants will be used and calculation continues.")
print()
c11, c12, c13, c33, c44 = default_cij
cij = xu.materials.material.HexagonalElasticTensor(c11, c12, c13, c33, c44)
material.elastic = cij
material.cijkl = xu.materials.material.Cij2Cijkl(cij)
ensure_elastic(substrate, "substrate", (390, 145, 106, 398, 105))
ensure_elastic(film, "film", (396, 137, 108, 373, 116))
return substrate, film
[ドキュメント]
def make_scan_axis():
"""
XRDシミュレーション用のデフォルトの角度軸 (2theta, omega) を生成します。
詳細説明:
`TWOTHETA_MIN` から `TWOTHETA_MAX` の範囲で `NPOINTS` の数の2theta角度を生成し、
それに対応するomega角度 (2thetaの半分) を計算します。
Returns:
tuple[numpy.ndarray, numpy.ndarray]: (2theta角度の配列 [deg], omega角度の配列 [deg])。
"""
twotheta = np.linspace(TWOTHETA_MIN, TWOTHETA_MAX, NPOINTS)
omega = twotheta / 2.0
return twotheta, omega
[ドキュメント]
def model(substrate, film, x, relax, thick, energy):
"""
指定されたパラメータで動的XRDシミュレーションモデルを構築します。
詳細説明:
X-rayUtilitiesの `Layer` オブジェクトと `PseudomorphicStack001` を使用して、
基板と膜からなるスタックの動的理論モデルを定義します。
膜は基板と膜の合金であり、組成 `x`、緩和度 `relax`、膜厚 `thick` を持ちます。
Parameters:
:param substrate: xu.materials.Crystal: 基板材料オブジェクト。
:param film: xu.materials.Crystal: 膜材料オブジェクト(合金のもう一方の端成分)。
:param x: float: 合金の組成比 (0.0 から 1.0)。
:param relax: float: 膜の緩和度 (0.0 は完全擬似格子、1.0 は完全緩和)。
:param thick: float: 膜の厚さ [Å]。
:param energy: float: X線エネルギー [eV]。
Returns:
xrayutilities.simpack.DynamicalModel: 構築された動的理論モデル。
"""
substrate_layer = xu.simpack.Layer(substrate, float("inf"))
alloy = xu.materials.material.Alloy(substrate, film, x)
film_layer = xu.simpack.Layer(alloy, thick, relaxation=relax)
stack = xu.simpack.PseudomorphicStack001("film/substrate", substrate_layer + film_layer)
dyn_model = xu.simpack.DynamicalModel(
stack,
energy=energy,
resolution_width=RESOLUTION_WIDTH,
)
return dyn_model
[ドキュメント]
def simulate(dyn_model, omega):
"""
動的理論モデルを使用してXRD強度分布を計算します。
Parameters:
:param dyn_model: xu.simpack.DynamicalModel: 計算に使用する構築済みモデル。
:param omega: numpy.ndarray: 計算する角度範囲のomega値の配列 [deg]。
Returns:
numpy.ndarray: 計算されたXRD強度分布の配列。
"""
intensity = dyn_model.simulate(omega, hkl=HKL)
return np.asarray(intensity, dtype=float)
# ============================================================
# データ入出力
# ============================================================
[ドキュメント]
def read_text_data(infile):
"""
テキストファイルから2列データ (2theta, intensity) を読み込みます。
詳細説明:
指定されたテキストファイルから数値データを読み込み、
1列目を2theta角度、2列目を強度として解釈します。
ファイルが存在しない場合や、フォーマットが不正な場合はエラーで終了します。
omega角度は2thetaの半分として計算されます。
Parameters:
:param infile: str: 2theta-intensityデータを含むテキストファイルへのパス。
Returns:
tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]:
(2theta角度の配列 [deg], omega角度の配列 [deg], 強度値の配列)。
"""
if not os.path.isfile(infile):
print(f'Error: infile "{infile}" not found.')
sys.exit(1)
data = np.loadtxt(infile)
if data.ndim != 2 or data.shape[1] < 2:
print("Error: input file must contain at least two columns: 2theta intensity")
sys.exit(1)
twotheta = data[:, 0]
intensity = data[:, 1]
omega = twotheta / 2.0
return twotheta, omega, np.asarray(intensity, dtype=float)
[ドキュメント]
def read_data(infile, substrate, film, energy):
"""
XRDデータファイルを読み込むか、ファイルが指定されていない場合は模擬データを生成します。
詳細説明:
`infile` が空文字列の場合、`TARGET_X`, `TARGET_RELAX`, `TARGET_THICK`
で定義されたターゲットパラメータを使用して模擬XRDパターンを生成します。
`infile` が指定されている場合は、`read_text_data` を呼び出してファイルを読み込みます。
Parameters:
:param infile: str: 2theta-intensityデータを含むテキストファイルへのパス。
空の場合は模擬データを生成します。
:param substrate: xu.materials.Crystal: 基板材料オブジェクト。
:param film: xu.materials.Crystal: 膜材料オブジェクト。
:param energy: float: X線エネルギー [eV]。
Returns:
tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]:
(2theta角度の配列 [deg], omega角度の配列 [deg], 強度値の配列)。
"""
if infile == "":
twotheta, omega = make_scan_axis()
target_model = model(substrate, film, TARGET_X, TARGET_RELAX, TARGET_THICK, energy)
intensity = simulate(target_model, omega)
return twotheta, omega, intensity
return read_text_data(infile)
# ============================================================
# ユーティリティ
# ============================================================
[ドキュメント]
def clamp_values(x, relax, thick):
"""
組成(x)、緩和度(relax)、膜厚(thick)のパラメータを物理的に妥当な範囲にクランプします。
詳細説明:
- `x` は0.0から1.0の範囲に制限されます。
- `relax` は0.0から1.0の範囲に制限されます。
- `thick` は1.0以上の値に制限されます。
Parameters:
:param x: float: 組成比。
:param relax: float: 緩和度。
:param thick: float: 膜厚 [Å]。
Returns:
tuple[float, float, float]: クランプされた (x, relax, thick) の値。
"""
x = min(max(x, 0.0), 1.0)
relax = min(max(relax, 0.0), 1.0)
thick = max(thick, 1.0)
return x, relax, thick
[ドキュメント]
def calc_residual(int_target, int_try, residual_scale="log"):
"""
ターゲット強度とシミュレーション強度の残差 (L2ノルム) を計算します。
詳細説明:
`residual_scale` が "log" の場合、強度は対数スケールに変換されてから残差が計算されます。
これにより、強度の低い部分のフィッティング精度を向上させることができます。
"linear" の場合は線形スケールで計算されます。
強度がゼロになることを避けるため、非常に小さいイプシロン値 (`eps`) が使用されます。
Parameters:
:param int_target: numpy.ndarray: 実験データまたはターゲット強度。
:param int_try: numpy.ndarray: 計算データまたは試行強度。
:param residual_scale: str: 残差計算に使用するスケール ("log" または "linear")。
Returns:
float: 計算された残差の総和 (L2ノルムの和)。
"""
eps = 1e-30
if residual_scale == "log":
y_target = np.log(np.maximum(int_target, eps))
y_try = np.log(np.maximum(int_try, eps))
else:
y_target = int_target
y_try = int_try
return np.sum((y_target - y_try) ** 2)
[ドキュメント]
def update_rrange(res):
"""
残差値に基づいて、ランダム探索の探索範囲 (rrange) を動的に調整します。
詳細説明:
残差が大きい場合は探索範囲を広げ、小さい場合は狭めることで、
最適化の効率を向上させます。残差が5.0より大きい場合は0.5を返し、
それ以外の場合は残差の1/10を返します。
Parameters:
:param res: float: 現在の残差値。
Returns:
float: 更新された探索範囲の係数。
"""
if res > 5.0:
return 0.5
return res / 10.0
[ドキュメント]
def calc_pattern_from_param_set(substrate, film, energy, omega, param_set):
"""
パラメータセットを直接渡してシミュレーション強度を計算します。
詳細説明:
`param_set` から組成、緩和度、膜厚を抽出し、
`clamp_values` で物理的範囲にクランプした後、モデルを構築し強度をシミュレートします。
Parameters:
:param substrate: xu.materials.Crystal: 基板材料オブジェクト。
:param film: xu.materials.Crystal: 膜材料オブジェクト。
:param energy: float: X線エネルギー [eV]。
:param omega: numpy.ndarray: 計算する角度範囲のomega値の配列 [deg]。
:param param_set: dict: 評価するパラメータセット。
Returns:
numpy.ndarray: 計算されたXRD強度分布の配列。
"""
x, relax, thick = parameter_set_to_values(param_set)
x, relax, thick = clamp_values(x, relax, thick)
dyn_model = model(substrate, film, x, relax, thick, energy)
intensity = simulate(dyn_model, omega)
return intensity
[ドキュメント]
def objective_from_param_set(substrate, film, energy, omega, int_target, param_set, residual_scale="log"):
"""
最適化に使用する目的関数を計算します。残差に加えて物理的境界外へのペナルティを含みます。
詳細説明:
`param_set` からパラメータを抽出し、シミュレーションを実行して強度を計算します。
計算された強度とターゲット強度との残差を `calc_residual` で求めます。
また、組成(x)、緩和度(relax)、膜厚(thick)が物理的に妥当な範囲
(x:[0,1], relax:[0,1], thick>0) を逸脱した場合、
大きなペナルティが追加され、最適化がこれらの制約内で解を見つけるように誘導されます。
Parameters:
:param substrate: xu.materials.Crystal: 基板材料オブジェクト。
:param film: xu.materials.Crystal: 膜材料オブジェクト。
:param energy: float: X線エネルギー [eV]。
:param omega: numpy.ndarray: 計算する角度範囲のomega値の配列 [deg]。
:param int_target: numpy.ndarray: 実験データまたはターゲット強度。
:param param_set: dict: 評価するパラメータセット。
:param residual_scale: str: 残差計算に使用するスケール ("log" または "linear")。
Returns:
float: ペナルティ込みの残差の総和。
"""
x, relax, thick = parameter_set_to_values(param_set)
penalty = 0.0
if x < 0.0:
penalty += 1e6 * (0.0 - x) ** 2
if x > 1.0:
penalty += 1e6 * (x - 1.0) ** 2
if relax < 0.0:
penalty += 1e6 * (0.0 - relax) ** 2
if relax > 1.0:
penalty += 1e6 * (relax - 1.0) ** 2
if thick <= 0.0:
penalty += 1e6 * (1.0 - thick) ** 2 # thickは1.0未満にならないようにペナルティ
intensity = calc_pattern_from_param_set(substrate, film, energy, omega, param_set)
return calc_residual(int_target, intensity, residual_scale=residual_scale) + penalty
[ドキュメント]
def plot_xrd(omega, intensity_list, labels, title, yscale="log", text_lines=None, wait=False):
"""
複数のXRDパターンをプロットし表示します。
詳細説明:
与えられたomega軸と複数の強度配列をプロットします。
各強度配列には対応するラベルが付けられます。
Y軸のスケール("linear"または"log")とプロットのタイトルを設定できます。
追加のテキスト行をプロット領域に表示することも可能です。
Parameters:
:param omega: numpy.ndarray: 角度軸 (omega) の配列 [deg]。
:param intensity_list: list[numpy.ndarray]: プロットする強度配列のリスト。
:param labels: list[str]: 各強度配列に対応する凡例ラベルのリスト。
:param title: str: プロットのタイトル。
:param yscale: str: Y軸のスケール ("linear" または "log")。デフォルトは "log"。
:param text_lines: list[str] or None: プロット内に表示する追加テキスト行のリスト。
:param wait: bool: プロットウィンドウが閉じるまで待機するかどうか。Trueの場合、
ウィンドウが閉じられるまでプログラムの実行がブロックされます。
"""
plt.figure()
for intensity, label in zip(intensity_list, labels):
plt.plot(omega, intensity, label=label)
if text_lines:
y0 = 0.90
dy = 0.05
for i, line in enumerate(text_lines):
plt.text(0.55, y0 - i * dy, line, transform=plt.gca().transAxes)
plt.title(title)
plt.yscale(yscale)
plt.xlabel(r"$\omega\ (deg)$")
plt.ylabel("Intensity (arb. u.)")
plt.legend()
if wait:
plt.show()
else:
plt.show()
[ドキュメント]
def make_live_plot(omega, int_target, int_init, title, yscale="log"):
"""
フィッティングの進捗を表示するためのインタラクティブなライブプロットを作成します。
詳細説明:
初期の実験データ (`int_target`) と初期シミュレーション結果 (`int_init`) を表示し、
フィッティング中に更新される `CurrentFit` のためのラインオブジェクトを準備します。
また、現在のパラメータや残差を表示するためのテキストボックスも作成します。
Parameters:
:param omega: numpy.ndarray: 角度軸 (omega) の配列 [deg]。
:param int_target: numpy.ndarray: 実験データ強度。
:param int_init: numpy.ndarray: 初期シミュレーション強度。
:param title: str: プロットのタイトル。
:param yscale: str: Y軸のスケール ("linear" または "log")。デフォルトは "log"。
Returns:
tuple[matplotlib.figure.Figure, matplotlib.axes.Axes, matplotlib.lines.Line2D, matplotlib.text.Text]:
(Figureオブジェクト, Axesオブジェクト, CurrentFitのLine2Dオブジェクト, テキストボックスのTextオブジェクト)。
"""
plt.ion()
fig, ax = plt.subplots(figsize=(8, 5))
ax.plot(omega, int_target, label="InputData")
ax.plot(omega, int_init, label="InitialSimulation")
line_fit, = ax.plot(omega, int_init, label="CurrentFit")
text_box = ax.text(0.55, 0.92, "", transform=ax.transAxes, va="top", ha="left")
ax.set_title(title)
ax.set_yscale(yscale)
ax.set_xlabel(r"$\omega\ (deg)$")
ax.set_ylabel("Intensity (arb. u.)")
ax.legend()
plt.show(block=False)
fig.canvas.draw()
fig.canvas.flush_events()
plt.pause(0.1)
return fig, ax, line_fit, text_box
[ドキュメント]
def update_live_plot(fig, line_fit, text_box, omega, int_fit, text_lines):
"""
作成済みのライブプロットの内容を更新します。
詳細説明:
`CurrentFit` のラインデータを新しいフィッティング強度 (`int_fit`) で更新し、
テキストボックスの内容を `text_lines` で更新してプロットを再描画します。
これにより、最適化の進捗がリアルタイムで視覚的に確認できます。
Parameters:
:param fig: matplotlib.figure.Figure: 更新するFigureオブジェクト。
:param line_fit: matplotlib.lines.Line2D: `CurrentFit` のLine2Dオブジェクト。
:param text_box: matplotlib.text.Text: 更新するテキストボックスのTextオブジェクト。
:param omega: numpy.ndarray: 角度軸 (omega) の配列 [deg]。
:param int_fit: numpy.ndarray: 現在のフィッティング強度。
:param text_lines: list[str]: テキストボックスに表示するテキスト行のリスト。
"""
line_fit.set_data(omega, int_fit)
text_box.set_text("\n".join(text_lines))
fig.canvas.draw()
fig.canvas.flush_events()
plt.pause(0.001)
[ドキュメント]
def show_final_live_plot_wait(fig):
"""
ライブプロットを非インタラクティブモードに切り替え、ウィンドウが閉じるまで待機します。
詳細説明:
最適化が完了した後、最終結果を表示したプロットウィンドウを
ユーザーが手動で閉じるまで保持します。
Parameters:
:param fig: matplotlib.figure.Figure: 待機するFigureオブジェクト。
"""
plt.ioff()
fig.canvas.draw()
fig.canvas.flush_events()
plt.show()
[ドキュメント]
def make_initial_simplex(param_set, optimize_names):
"""
Nelder-Mead最適化法のための初期シンプレックスを生成します。
詳細説明:
初期シンプレックスは、`optimize_names` で指定された最適化対象パラメータの
現在の値 (`param_set`) を中心に、N+1個の頂点を持つ形状として生成されます。
各パラメータに対して、現在の値から大きく離れた値を設定した頂点を生成することで、
初期探索空間をカバーします。
Parameters:
:param param_set: dict: 基準となる初期パラメータセット。
:param optimize_names: list[str]: 最適化対象のパラメータ名のリスト。
Returns:
numpy.ndarray: Nelder-Mead法のための初期シンプレックス。各行が1つの頂点を表します。
"""
x, relax, thick = parameter_set_to_values(param_set)
x0 = pack_optimize_values(param_set, optimize_names)
simplex = [x0.copy()]
for idx, name in enumerate(optimize_names):
xnew = x0.copy()
if name == "x":
xnew[idx] = 1.0 if x <= 0.5 else 0.0
elif name == "relax":
xnew[idx] = 1.0 if relax <= 0.5 else 0.0
elif name == "thick":
xnew[idx] = max(thick * 1.25, 1.0) # Thick must be > 0.
simplex.append(xnew)
# thickのみを最適化する場合のシンプレックス調整
# thick * 0.75 と thick * 1.25 で構成
if optimize_names == ["thick"]:
simplex = [
np.array([max(thick * 0.75, 1.0)], dtype=float),
np.array([max(thick * 1.25, 1.0)], dtype=float),
]
return np.array(simplex, dtype=float)
[ドキュメント]
def get_param_bounds(name, current_value):
"""
特定のパラメータの探索境界(下限と上限)を取得します。
詳細説明:
組成(`x`)と緩和度(`relax`)はそれぞれ[0.0, 1.0]に制限されます。
膜厚(`thick`)は現在の値を基準に相対的な範囲([0.5*current_value, 1.5*current_value])で、
かつ最低1.0Åという制約が適用されます。
これらはPSOアルゴリズムなどでの粒子の位置更新や初期化に利用されます。
Parameters:
:param name: str: パラメータ名 ("x", "relax", "thick")。
:param current_value: float: そのパラメータの現在の値。
Returns:
tuple[float, float]: (下限値, 上限値) のタプル。
"""
if name == "x":
return 0.0, 1.0
if name == "relax":
return 0.0, 1.0
if name == "thick":
low = max(current_value * 0.5, 1.0)
high = max(current_value * 1.5, low + 1.0) # Ensure high is strictly greater than low if low is 1.0
return low, high
return -np.inf, np.inf # デフォルトでは無限の範囲
[ドキュメント]
def randomize_param_set_for_pso(base_param_set, optimize_names):
"""
PSOの初期化のために、最適化対象のパラメータをランダムな値に設定した新しいパラメータセットを生成します。
詳細説明:
`base_param_set` をクローンし、`optimize_names` に含まれる各パラメータについて、
`get_param_bounds` で定義された範囲内で一様乱数を用いて値を設定します。
Parameters:
:param base_param_set: dict: 基準となるパラメータセット(最適化対象外の値を保持するために使用)。
:param optimize_names: list[str]: ランダム化する最適化対象のパラメータ名のリスト。
Returns:
dict: ランダム化された最適化対象パラメータを持つ新しいパラメータセット。
"""
ps = clone_parameter_set(base_param_set)
for name in optimize_names:
value = ps[name]["value"]
low, high = get_param_bounds(name, value)
ps[name]["value"] = random.uniform(low, high)
return ps
# ============================================================
# fit: ランダム探索
# ============================================================
[ドキュメント]
def fit_random(
substrate,
film,
energy,
omega,
int_target,
param_sets,
parameter_file,
yscale="log",
residual_scale="log",
):
"""
ランダムウォーク的な手法でXRDフィッティングを行います。
詳細説明:
現在の最適パラメータセットからランダムに少しずつパラメータを変化させ、
残差が改善されればその新しいパラメータセットを採用するという手順を繰り返します。
一定回数改善が見られない場合や、残差が `LIMIT` 以下になれば停止します。
最適化の進捗はライブプロットで視覚的に表示されます。
Parameters:
:param substrate: xu.materials.Crystal: 基板材料オブジェクト。
:param film: xu.materials.Crystal: 膜材料オブジェクト。
:param energy: float: X線エネルギー [eV]。
:param omega: numpy.ndarray: 計算する角度範囲のomega値の配列 [deg]。
:param int_target: numpy.ndarray: 実験データ強度。
:param param_sets: dict: 初期パラメータ、最適化の進捗がここに保存されます。
:param parameter_file: str: パラメータの保存先ファイルパス。
:param yscale: str: プロットのY軸スケール ("linear" または "log")。
:param residual_scale: str: 残差計算に使用するスケール ("log" または "linear")。
Returns:
dict: 最適化されたパラメータセットを含む `param_sets` 辞書。
"""
param_set = clone_parameter_set(param_sets[DEFAULT_PRIMARY_SET])
optimize_names = get_optimize_names(param_set)
if len(optimize_names) == 0:
print("All parameters are fixed. No optimization is performed.")
return param_sets
int_init = calc_pattern_from_param_set(substrate, film, energy, omega, param_set)
int_try_now = int_init.copy()
res = calc_residual(int_target, int_try_now, residual_scale=residual_scale)
rrange = update_rrange(res)
fig, ax, line_fit, text_box = make_live_plot(
omega,
int_target,
int_init,
"膜のXRDフィッティング (Random)",
yscale=yscale,
)
total_count = 0
count = 0
while (res > LIMIT) and (count < MAX_STALL):
new_param_set = clone_parameter_set(param_set)
# 各最適化対象パラメータにランダムな変動を加える
if "x" in optimize_names:
new_param_set["x"]["value"] += random.uniform(-rrange * XRANGE, rrange * XRANGE)
if "relax" in optimize_names:
new_param_set["relax"]["value"] += random.uniform(-rrange * RRANGE, rrange * RRANGE)
if "thick" in optimize_names:
new_param_set["thick"]["value"] += random.uniform(-rrange * TRANGE, rrange * TRANGE)
res_new = objective_from_param_set(
substrate, film, energy, omega, int_target, new_param_set, residual_scale=residual_scale
)
total_count += 1
count += 1
if total_count % NINTERVAL_PRINT == 0:
x, relax, thick = parameter_set_to_values(param_set)
print(
f"{total_count} {count} : "
f"residual= {res:.6f} x= {x:.4f} relax= {relax:.4f} T= {thick:.2f}"
)
if res_new < res:
param_set = new_param_set
int_try_now = calc_pattern_from_param_set(substrate, film, energy, omega, param_set)
res = res_new
rrange = update_rrange(res)
count = 0
if total_count % NINTERVAL_SAVE == 0:
set_best_parameter_set(param_sets, param_set)
save_parameters(parameter_file, param_sets)
if total_count % NINTERVAL_PLOT == 0:
x, relax, thick = parameter_set_to_values(param_set)
update_live_plot(
fig,
line_fit,
text_box,
omega,
int_try_now,
[
f"iter= {total_count}",
f"residual= {res:.6f}",
f"x= {x:.4f}",
f"r= {relax:.4f}",
f"t= {thick:.2f}",
],
)
set_best_parameter_set(param_sets, param_set)
save_parameters(parameter_file, param_sets)
show_final_live_plot_wait(fig)
return param_sets
# ============================================================
# fit: scipy minimize
# ============================================================
[ドキュメント]
def fit_scipy(
substrate,
film,
energy,
omega,
int_target,
param_sets,
parameter_file,
method_name,
nmaxiter,
tol,
yscale="log",
residual_scale="log",
):
"""
SciPyの最適化アルゴリズム(Nelder-Mead, BFGS, CG)を使用してXRDフィッティングを行います。
詳細説明:
`scipy.optimize.minimize` 関数を利用し、指定された最適化手法でパラメータを調整します。
目的関数は `objective_from_param_set` で定義されており、物理的な制約へのペナルティを含みます。
最適化の進捗はコールバック関数を通じてライブプロットにリアルタイムで表示され、
一定間隔でパラメータがファイルに保存されます。
Parameters:
:param substrate: xu.materials.Crystal: 基板材料オブジェクト。
:param film: xu.materials.Crystal: 膜材料オブジェクト。
:param energy: float: X線エネルギー [eV]。
:param omega: numpy.ndarray: 計算する角度範囲のomega値の配列 [deg]。
:param int_target: numpy.ndarray: 実験データ強度。
:param param_sets: dict: 初期パラメータ、最適化の進捗がここに保存されます。
:param parameter_file: str: パラメータの保存先ファイルパス。
:param method_name: str: 使用するSciPy最適化手法の名前 ("nelder-mead", "bfgs", "cg")。
:param nmaxiter: int: 最大反復回数。
:param tol: float: 収束判定の許容誤差。
:param yscale: str: プロットのY軸スケール ("linear" または "log")。
:param residual_scale: str: 残差計算に使用するスケール ("log" または "linear")。
Returns:
dict: 最適化されたパラメータセットを含む `param_sets` 辞書。
"""
param_set = clone_parameter_set(param_sets[DEFAULT_PRIMARY_SET])
optimize_names = get_optimize_names(param_set)
if len(optimize_names) == 0:
print("All parameters are fixed. No optimization is performed.")
return param_sets
x0 = pack_optimize_values(param_set, optimize_names)
int_init = calc_pattern_from_param_set(substrate, film, energy, omega, param_set)
fig, ax, line_fit, text_box = make_live_plot(
omega,
int_target,
int_init,
f"膜のXRDフィッティング ({method_name})",
yscale=yscale,
)
iter_state = {"count": 0, "best_res": None, "best_param_set": clone_parameter_set(param_set)}
def objective_vec(xvec):
"""最適化アルゴリズムに渡す目的関数 (ベクトル入力用)。"""
work = unpack_optimize_values(param_set, optimize_names, xvec)
return objective_from_param_set(
substrate, film, energy, omega, int_target, work, residual_scale=residual_scale
)
def callback(xk):
"""最適化の各ステップで呼び出されるコールバック関数。"""
iter_state["count"] += 1
work = unpack_optimize_values(param_set, optimize_names, xk)
res = objective_from_param_set(
substrate, film, energy, omega, int_target, work, residual_scale=residual_scale
)
if iter_state["best_res"] is None or res < iter_state["best_res"]:
iter_state["best_res"] = res
iter_state["best_param_set"] = clone_parameter_set(work)
x, relax, thick = parameter_set_to_values(work)
if iter_state["count"] % NINTERVAL_PRINT == 0:
print(
f"{iter_state['count']}/{nmaxiter} : "
f"residual= {res:.6f} x= {x:.4f} relax= {relax:.4f} T= {thick:.2f}"
)
if iter_state["count"] % NINTERVAL_SAVE == 0:
set_best_parameter_set(param_sets, iter_state["best_param_set"])
save_parameters(parameter_file, param_sets)
if iter_state["count"] % NINTERVAL_PLOT == 0:
int_fit = calc_pattern_from_param_set(substrate, film, energy, omega, work)
update_live_plot(
fig,
line_fit,
text_box,
omega,
int_fit,
[
f"iter= {iter_state['count']}",
f"residual= {res:.6f}",
f"x= {x:.4f}",
f"r= {relax:.4f}",
f"t= {thick:.2f}",
],
)
scipy_method = {
"nelder-mead": "Nelder-Mead",
"bfgs": "BFGS",
"cg": "CG",
}[method_name]
options = {"maxiter": nmaxiter, "disp": True}
if method_name == "nelder-mead":
options["initial_simplex"] = make_initial_simplex(param_set, optimize_names)
options["xatol"] = tol # absolute tolerance for changes in x
options["fatol"] = tol # absolute tolerance for changes in fn
result = minimize(
objective_vec,
x0,
method=scipy_method,
callback=callback,
options=options,
tol=tol,
)
final_param_set = unpack_optimize_values(param_set, optimize_names, result.x)
final_res = objective_from_param_set(
substrate, film, energy, omega, int_target, final_param_set, residual_scale=residual_scale
)
# コールバック関数で記録された最良の結果と最終結果を比較
if iter_state["best_res"] is not None and iter_state["best_res"] < final_res:
final_param_set = clone_parameter_set(iter_state["best_param_set"])
set_best_parameter_set(param_sets, final_param_set)
save_parameters(parameter_file, param_sets)
show_final_live_plot_wait(fig)
return param_sets
# ============================================================
# fit: PSO (粒子群最適化)
# ============================================================
[ドキュメント]
def fit_pso(
substrate,
film,
energy,
omega,
int_target,
param_sets,
parameter_file,
nmaxiter,
tol,
nparticles,
w,
c1,
c2,
yscale="log",
residual_scale="log",
pso_stall_max=20,
pso_spread_rtol=0.01,
):
"""
粒子群最適化 (PSO) を使用してXRDフィッティングを行います。
詳細説明:
この関数は、粒子群最適化アルゴリズムを実装し、複数の「粒子」が解空間を探索します。
各粒子は、自身の最良の位置 (`pbest`) と群全体の最良の位置 (`gbest`) に基づいて
速度と位置を更新します。一定期間の改善なし (`pso_stall_max`) または
粒子の位置の分散が閾値以下になった場合 (`pso_spread_rtol`) に早期停止します。
最適化の進捗はライブプロットで視覚的に表示され、一定間隔でパラメータがファイルに保存されます。
Parameters:
:param substrate: xu.materials.Crystal: 基板材料オブジェクト。
:param film: xu.materials.Crystal: 膜材料オブジェクト。
:param energy: float: X線エネルギー [eV]。
:param omega: numpy.ndarray: 計算する角度範囲のomega値の配列 [deg]。
:param int_target: numpy.ndarray: 実験データ強度。
:param param_sets: dict: 初期パラメータ、最適化の進捗がここに保存されます。
:param parameter_file: str: パラメータの保存先ファイルパス。
:param nmaxiter: int: 最大反復回数。
:param tol: float: 収束判定の許容誤差(残差の改善量に対する閾値)。
:param nparticles: int: 粒子の総数。
:param w: float: PSOの慣性重み。
:param c1: float: PSOの自己学習係数。
:param c2: float: PSOの社会学習係数。
:param yscale: str: プロットのY軸スケール ("linear" または "log")。
:param residual_scale: str: 残差計算に使用するスケール ("log" または "linear")。
:param pso_stall_max: int: 改善が見られない場合にPSOを停止する最大反復数。
:param pso_spread_rtol: float: 粒子の分散に基づく停止条件の相対許容誤差。
Returns:
dict: 最適化されたパラメータセットを含む `param_sets` 辞書。
"""
base_param_set = clone_parameter_set(param_sets[DEFAULT_PRIMARY_SET])
optimize_names = get_optimize_names(base_param_set)
if len(optimize_names) == 0:
print("All parameters are fixed. No optimization is performed.")
return param_sets
int_init = calc_pattern_from_param_set(substrate, film, energy, omega, base_param_set)
fig, ax, line_fit, text_box = make_live_plot(
omega,
int_target,
int_init,
"膜のXRDフィッティング (PSO)",
yscale=yscale,
)
seed_particles = get_pso_parameter_sets(param_sets)
particles = [clone_parameter_set(ps) for ps in seed_particles]
# 必要に応じて粒子数を増やす
while len(particles) < nparticles:
particles.append(randomize_param_set_for_pso(base_param_set, optimize_names))
# 必要に応じて粒子数を減らす
if len(particles) > nparticles:
particles = particles[:nparticles]
positions = []
velocities = []
pbest_positions = []
pbest_scores = []
for ps in particles:
pos = pack_optimize_values(ps, optimize_names)
vel = np.zeros_like(pos)
# 速度の初期化
for i, name in enumerate(optimize_names):
val = ps[name]["value"]
low, high = get_param_bounds(name, val)
span = high - low
# 初期速度を探索範囲の10%程度でランダムに設定
vel[i] = random.uniform(-0.1 * span, 0.1 * span)
score = objective_from_param_set(
substrate, film, energy, omega, int_target, ps, residual_scale=residual_scale
)
positions.append(pos.copy())
velocities.append(vel.copy())
pbest_positions.append(pos.copy())
pbest_scores.append(score)
positions = np.array(positions, dtype=float)
velocities = np.array(velocities, dtype=float)
pbest_positions = np.array(pbest_positions, dtype=float)
pbest_scores = np.array(pbest_scores, dtype=float)
gbest_idx = int(np.argmin(pbest_scores))
gbest_position = pbest_positions[gbest_idx].copy()
gbest_score = float(pbest_scores[gbest_idx])
iter_count = 0
prev_best = gbest_score
no_improve_count = 0
base_x, base_relax, base_thick = parameter_set_to_values(base_param_set)
# 粒子の分散許容誤差をパラメータごとに調整
spread_tol_map = {
"x": pso_spread_rtol * 1.0, # xは[0,1]なので相対値そのまま
"relax": pso_spread_rtol * 1.0, # relaxも[0,1]なので相対値そのまま
"thick": pso_spread_rtol * base_thick, # thickは絶対値なので、基準値に対する相対値
}
while iter_count < nmaxiter:
iter_count += 1
for p in range(nparticles):
for i, name in enumerate(optimize_names):
current_val = positions[p, i]
low, high = get_param_bounds(name, current_val)
r1 = random.random()
r2 = random.random()
# 速度の更新
velocities[p, i] = (
w * velocities[p, i]
+ c1 * r1 * (pbest_positions[p, i] - positions[p, i])
+ c2 * r2 * (gbest_position[i] - positions[p, i])
)
# 位置の更新
positions[p, i] += velocities[p, i]
# 境界条件処理
if positions[p, i] < low:
positions[p, i] = low
velocities[p, i] *= -0.5 # 境界で跳ね返す
elif positions[p, i] > high:
positions[p, i] = high
velocities[p, i] *= -0.5 # 境界で跳ね返す
work = unpack_optimize_values(base_param_set, optimize_names, positions[p])
score = objective_from_param_set(
substrate, film, energy, omega, int_target, work, residual_scale=residual_scale
)
# pbestの更新
if score < pbest_scores[p]:
pbest_scores[p] = score
pbest_positions[p] = positions[p].copy()
# gbestの更新
if score < gbest_score:
gbest_score = float(score)
gbest_position = positions[p].copy()
best_param_set = unpack_optimize_values(base_param_set, optimize_names, gbest_position)
best_int = calc_pattern_from_param_set(substrate, film, energy, omega, best_param_set)
best_x, best_relax, best_thick = parameter_set_to_values(best_param_set)
if iter_count % NINTERVAL_PRINT == 0:
print(
f"{iter_count}/{nmaxiter} : residual= {gbest_score:.6f} "
f"x= {best_x:.4f} relax= {best_relax:.4f} T= {best_thick:.2f}"
)
if iter_count % NINTERVAL_SAVE == 0:
set_best_parameter_set(param_sets, best_param_set)
# 現在の粒子群の最良なものを保存
particle_sets = []
order = np.argsort(pbest_scores)
for idx in order[:nparticles]: # 上位nparticles個の粒子を保存
ps = unpack_optimize_values(base_param_set, optimize_names, pbest_positions[idx])
particle_sets.append(ps)
param_sets = replace_pso_parameter_sets(param_sets, particle_sets)
save_parameters(parameter_file, param_sets)
if iter_count % NINTERVAL_PLOT == 0:
update_live_plot(
fig,
line_fit,
text_box,
omega,
best_int,
[
f"iter= {iter_count}",
f"residual= {gbest_score:.6f}",
f"x= {best_x:.4f}",
f"r= {best_relax:.4f}",
f"t= {best_thick:.2f}",
],
)
# 停止条件のチェック
improve = prev_best - gbest_score
if improve < tol: # 残差の改善が許容誤差未満
no_improve_count += 1
else:
no_improve_count = 0
# 粒子の分散が十分に小さいかチェック
spread_ok = True
spread_info = {} # デバッグ用
for i, name in enumerate(optimize_names):
spread = float(np.max(positions[:, i]) - np.min(positions[:, i]))
spread_info[name] = spread
if spread > spread_tol_map[name]:
spread_ok = False
if no_improve_count >= pso_stall_max:
print("PSO stop: stall criterion satisfied.")
break
if spread_ok:
print("PSO stop: spread criterion satisfied.")
break
prev_best = gbest_score
set_best_parameter_set(param_sets, best_param_set)
save_parameters(parameter_file, param_sets)
show_final_live_plot_wait(fig)
return param_sets
# ============================================================
# guessモード ヘルパー
# ============================================================
[ドキュメント]
def ensure_valid_savgol(points, order, ndata):
"""
Savitzky-Golayフィルタのパラメータがデータ長に対して妥当であることを保証します。
詳細説明:
ウィンドウサイズ `points` は奇数であり、データ長 `ndata` を超えないように調整されます。
多項式次数 `order` はウィンドウサイズ `points` より小さく調整されます。
Parameters:
:param points: int: Savitzky-Golayフィルタのウィンドウ点数。
:param order: int: Savitzky-Golayフィルタの多項式次数。
:param ndata: int: 入力データの総点数。
Returns:
tuple[int, int]: 調整された (ウィンドウ点数, 多項式次数)。
"""
points = int(points)
order = int(order)
if points < 3: points = 3
if points % 2 == 0: points += 1 # Ensure odd window length
if points > ndata: points = ndata if ndata % 2 == 1 else ndata - 1
if order >= points: order = points - 1
return points, order
[ドキュメント]
def estimate_thickness_from_fft_signal(omega_deg, signal, wavelength, keep_n=5, nq_uniform=DEFAULT_NQ_UNIFORM):
"""
信号に対してFFTを行い、主要な周波数成分から膜厚の候補を算出します。
詳細説明:
入力の角度データと信号をq軸に変換し、一様間隔で補間します。
ハニング窓を適用した後、FFTを実行して周波数スペクトルを取得します。
スペクトル中の顕著なピークを検出し、その周波数から膜厚候補を計算します。
複数の候補をスコアの高い順にソートして返します。
Parameters:
:param omega_deg: numpy.ndarray: 角度データ [deg]。
:param signal: numpy.ndarray: 解析対象の信号 (通常はエンベロープ除去後の残差)。
:param wavelength: float: X線波長 [Å]。
:param keep_n: int: 保持する膜厚候補の最大数。
:param nq_uniform: int: FFTのために一様補間するq軸の点数。
Returns:
tuple[list[dict], numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]:
- candidates_list: 膜厚候補のリスト。各候補は辞書 (`method`, `thick`, `score`, `dq`, `freq`)。
- qz_axis: 一様化されたqz軸。
- signal_uniform: 一様化された信号。
- freq_axis: FFTの周波数軸。
- fft_amplitude: FFTの振幅。
"""
theta_rad = np.deg2rad(omega_deg)
qz = 4.0 * np.pi / wavelength * np.sin(theta_rad)
idx = np.argsort(qz)
qz = qz[idx]
signal = signal[idx]
# q軸を一様間隔に補間
qz_uniform = np.linspace(qz.min(), qz.max(), nq_uniform)
y_uniform = np.interp(qz_uniform, qz, signal)
y_uniform = y_uniform - np.mean(y_uniform) # DC成分除去
dq = qz_uniform[1] - qz_uniform[0]
fft_vals = np.fft.rfft(y_uniform * np.hanning(len(y_uniform))) # ハニング窓適用
fft_amp = np.abs(fft_vals)
freq = np.fft.rfftfreq(len(y_uniform), d=dq)
if len(fft_amp) > 0: fft_amp[0] = 0.0 # DC成分をゼロにする
# FFTスペクトルからピークを検出
peak_idx, props = find_peaks(fft_amp, prominence=np.max(fft_amp) * 0.05 if np.max(fft_amp)>0 else 0)
candidates = []
if len(peak_idx) > 0:
order = np.argsort(props.get("prominences", np.ones(len(peak_idx))))[::-1] # Prominenceでソート
for idxp in peak_idx[order][:keep_n]:
f = freq[idxp]
if f <= 0: continue # 周波数が0以下のものは無視
dq_val = 1.0 / f
thick = 2.0 * np.pi / dq_val # 膜厚 = 2*pi / dq
candidates.append({"method": "fft", "thick": float(thick), "score": float(fft_amp[idxp]), "dq": float(dq_val), "freq": float(f)})
return sorted(candidates, key=lambda d: d["score"], reverse=True), qz_uniform, y_uniform, freq, fft_amp
[ドキュメント]
def assign_fringe_index_auto(q_vals, dq_est):
"""
FFTによる推定周期 `dq_est` に基づき、各ピークにフリンジ次数 (n) を自動的に割り当てます。
詳細説明:
フリンジのQ値 (`q_vals`) の差分と推定された周期 (`dq_est`) を比較し、
最も近い整数倍をフリンジ次数のステップとして採用します。
Parameters:
:param q_vals: numpy.ndarray: 検出されたフリンジピークのQ値の配列。
:param dq_est: float: FFTによって推定されたフリンジ周期 (Δq)。
Returns:
numpy.ndarray: 各ピークに割り当てられたフリンジ次数 (n) の配列。
"""
if len(q_vals) == 0: return np.array([], dtype=int)
n_vals = [0] # 最初のピークを0次としてスタート
for i in range(1, len(q_vals)):
step = int(round((q_vals[i] - q_vals[i - 1]) / dq_est))
n_vals.append(n_vals[-1] + max(step, 1)) # 少なくとも1次ずつ増えることを保証
return np.array(n_vals, dtype=int)
[ドキュメント]
def linear_fit_two_stage(n_vals, q_vals):
"""
フリンジ次数 (n) と q 値の関係を2段階の線形回帰で解析し、傾きから膜厚を求めます。
詳細説明:
1. 全てのピークデータを用いて線形回帰を行い、初期の残差と標準偏差を計算します。
2. 初期残差が標準偏差の2倍を超える外れ値を除外し、残りのデータで再度線形回帰を行います。
3. 最終的な回帰の傾き (dq/dn) から、膜厚 (Thickness = 2π / (dq/dn)) を計算します。
Parameters:
:param n_vals: numpy.ndarray: フリンジ次数の配列。
:param q_vals: numpy.ndarray: 対応するQ値の配列。
Returns:
dict or None: 解析結果を含む辞書。ピークが2つ未満の場合はNone。
- `coef1`: 1段階目の回帰係数。
- `coef2`: 2段階目の回帰係数。
- `used_stage1`: 1段階目で使用されたピークのブール配列。
- `used_stage2`: 2段階目で使用されたピークのブール配列。
- `resid_stage1`: 1段階目の残差。
- `resid_stage2`: 2段階目の残差。
- `thickness`: 計算された膜厚 [Å]。
"""
if len(n_vals) < 2: return None # 2点以上必要
# Stage 1: 全てのデータで線形回帰
coef1 = np.polyfit(n_vals, q_vals, 1)
resid1 = q_vals - np.polyval(coef1, n_vals)
sigma1 = np.std(resid1, ddof=1) if len(resid1) >= 2 else 0 # 標本標準偏差 (ddof=1)
# Stage 2: 外れ値除去後に再回帰
if sigma1 > 0:
used2 = np.abs(resid1) < 2.0 * sigma1 # 2σ以内に絞る
else: # sigma1が0の場合、全て使用(データ点が少ないなど)
used2 = np.ones(len(n_vals), dtype=bool)
if np.sum(used2) < 2: # 除去後も2点以上必要
used2 = np.ones(len(n_vals), dtype=bool) # 2点未満なら全て使用に戻す
coef2 = np.polyfit(n_vals[used2], q_vals[used2], 1)
# 傾きから膜厚を計算 (dq/dn) -> thickness = 2*pi / |slope|
thickness = 2.0 * np.pi / abs(coef2[0]) if coef2[0] != 0 else np.nan
return {
"coef1": coef1,
"coef2": coef2,
"used_stage1": np.ones(len(n_vals), dtype=bool), # 常に全てTrue
"used_stage2": used2,
"resid_stage1": resid1,
"resid_stage2": q_vals - np.polyval(coef2, n_vals),
"thickness": thickness
}
[ドキュメント]
def detect_fringe_clusters(q_vals, dq_ref, gap_factor=DEFAULT_CLUSTER_GAP_FACTOR):
"""
フリンジの欠損が大きい箇所でデータを分割し、クラスター化します。
詳細説明:
検出されたQ値の差分が参照周期 `dq_ref` に `gap_factor` を掛けた値よりも大きい場合、
その箇所でフリンジピークのシーケンスが途切れていると判断し、データを複数のクラスターに分割します。
これにより、不連続なフリンジパターンでも個別に解析できるようになります。
各クラスターは少なくとも2つのピークを含む必要があります。
Parameters:
:param q_vals: numpy.ndarray: 検出されたフリンジピークのQ値の配列。
:param dq_ref: float: フリンジ周期の参照値 (FFTから得られた推定値など)。
:param gap_factor: float: クラスター分割の閾値を決定するための係数。
Returns:
list[numpy.ndarray]: 各クラスターに属するピークのインデックスの配列のリスト。
"""
if len(q_vals) < 2: return [np.arange(len(q_vals))]
diffs = np.diff(q_vals)
# 差が基準値より大きい箇所で分割
split_idx = np.where(diffs > gap_factor * dq_ref)[0]
clusters = []
start = 0
for idx in split_idx:
clusters.append(np.arange(start, idx + 1))
start = idx + 1
clusters.append(np.arange(start, len(q_vals)))
return [cl for cl in clusters if len(cl) >= 2] # 2点以上のクラスターのみを保持
[ドキュメント]
def build_cluster_peak_tables(omega_cut, residual_for_peaks, wavelength, dq_ref, gap_factor):
"""
フリンジピークを検出し、検出されたピークをクラスター化し、
各クラスターで線形回帰分析を実行します。
詳細説明:
1. 残差信号からフリンジピークを検出します。
2. 検出されたピークのQ値を計算します。
3. `detect_fringe_clusters` を使用して、ピークを論理的なクラスターに分割します。
4. 各クラスター内で `assign_fringe_index_auto` でフリンジ次数を割り当て、
`linear_fit_two_stage` で線形回帰を行い、膜厚を推定します。
5. 検出されたピークの詳細 (`all_peak_rows`) とクラスターごとの解析結果 (`cluster_rows`)
および回帰結果 (`cluster_fit_results`) を生成します。
Parameters:
:param omega_cut: numpy.ndarray: 角度範囲を限定したomegaの配列 [deg]。
:param residual_for_peaks: numpy.ndarray: ピーク検出に使用する残差信号。
:param wavelength: float: X線波長 [Å]。
:param dq_ref: float: フリンジ周期の参照値 (Δq)。
:param gap_factor: float: クラスター分割の閾値を決定するための係数。
Returns:
tuple[list[dict], list[dict], list[dict]]:
- all_peak_rows: 検出された全ピークの詳細情報。
- cluster_rows: 各クラスターごとの解析サマリー。
- cluster_fit_results: 各クラスターの線形回帰の詳細結果。
ピークが検出されない場合は、空のリストとNoneが返されます。
"""
# 残差信号からピークを検出
peak_idx, props = find_peaks(residual_for_peaks, prominence=np.max(residual_for_peaks) * 0.02 if np.max(residual_for_peaks)>0 else 0)
if len(peak_idx) == 0: return [], [], None
# プロミネンスが高いピークから最大30個を選択し、角度順にソート
order = np.argsort(props.get("prominences", np.ones(len(peak_idx))))[::-1]
peak_idx = np.sort(peak_idx[order][:30])
theta_deg = omega_cut[peak_idx]
q_vals = 4.0 * np.pi / wavelength * np.sin(np.deg2rad(theta_deg))
# ピークをクラスターに分割
clusters = detect_fringe_clusters(q_vals, dq_ref, gap_factor)
all_peak_rows, cluster_rows, cluster_fit_results = [], [], []
for cluster_id, cl in enumerate(clusters):
q_cl, twotheta_cl = q_vals[cl], 2.0 * theta_deg[cl]
n_cl = assign_fringe_index_auto(q_cl, dq_ref)
fit_result = linear_fit_two_stage(n_cl, q_cl)
if fit_result is None: continue # 回帰ができなかった場合、このクラスターはスキップ
cluster_fit_results.append({"cluster_id": cluster_id, "n_vals": n_cl, "q_vals": q_cl, "fit_result": fit_result})
for i in range(len(q_cl)):
all_peak_rows.append({
"cluster_id": cluster_id, "n": int(n_cl[i]), "q": float(q_cl[i]), "twotheta": float(twotheta_cl[i]),
"used_stage1": True, "used_stage2": bool(fit_result["used_stage2"][i]),
"resid_stage1": float(fit_result["resid_stage1"][i]), "resid_stage2": float(fit_result["resid_stage2"][i])
})
cluster_rows.append({
"cluster_id": cluster_id, "npeaks": len(q_cl), "dq_est": float(dq_ref),
"thick_fft": float(2.0 * np.pi / dq_ref), "thick_reg": float(fit_result["thickness"]),
"confidence": float(len(q_cl)) # クラスター内のピーク数で信頼度を表現
})
return all_peak_rows, cluster_rows, cluster_fit_results
# ============================================================
# guess モード 本体
# ============================================================
[ドキュメント]
def guess_thickness(
substrate, film, energy, omega, intensity, parameter_file, param_sets,
guess_low, guess_high, nsmooth_points, nsmooth_order, nguess_keep,
cluster_gap_factor=DEFAULT_CLUSTER_GAP_FACTOR, yscale="log", infile=""
):
"""
XRDフリンジ解析に基づいて膜厚を推定します。解析結果をプロットし、上位候補をパラメータCSVに保存します。
詳細説明:
1. 指定された角度範囲でXRDデータの一部を切り出します。
2. Savitzky-Golayフィルタを使用して強度データからエンベロープを除去し、残差信号を抽出します。
3. 残差信号にFFTを適用し、主要な周波数成分から膜厚の初期候補 (Δq_est) を算出します。
4. 残差信号からフリンジピークを検出し、Δq_estとギャップ係数に基づいてピークをクラスターに分割します。
5. 各クラスター内でフリンジ次数を割り当て、線形回帰分析を行い、最終的な膜厚候補を算出します。
6. 検出されたピーク情報とクラスター情報をCSVファイルに保存します。
7. FFTと線形回帰の両方から得られた膜厚候補をスコア順にソートし、上位の候補をコンソールに表示します。
8. 最もスコアの高い膜厚候補をプライマリパラメータセットの膜厚値として設定し、
上位の候補はPSO粒子としてパラメータファイルに保存されます。
9. 解析の主要なステップ(信号・残差、FFTスペクトル、n-qプロット)を視覚化します。
Parameters:
:param substrate: xu.materials.Crystal: 基板材料オブジェクト。
:param film: xu.materials.Crystal: 膜材料オブジェクト。
:param energy: float: X線エネルギー [eV]。
:param omega: numpy.ndarray: 全角度範囲のomega値の配列 [deg]。
:param intensity: numpy.ndarray: 全強度データ。
:param parameter_file: str: パラメータの保存先ファイルパス。
:param param_sets: dict: パラメータセット(推定結果がここに保存されます)。
:param guess_low: float: 解析を開始する角度 [deg]。
:param guess_high: float: 解析を終了する角度 [deg]。
:param nsmooth_points: int: Savitzky-Golayフィルタのウィンドウ点数。
:param nsmooth_order: int: Savitzky-Golayフィルタの多項式次数。
:param nguess_keep: int: 保持する膜厚候補の数。
:param cluster_gap_factor: float: フリンジクラスター判定用のギャップ係数。
:param yscale: str: プロットのY軸スケール ("linear" または "log")。
:param infile: str: 入力データファイル名(出力ファイル名生成に使用)。
Returns:
dict: 更新されたパラメータセットを含む `param_sets` 辞書。
"""
mask = (omega >= guess_low) & (omega <= guess_high)
if np.sum(mask) < 10:
print("Error: guess range contains too few points.")
return param_sets
omega_cut, int_cut = omega[mask], intensity[mask]
y_signal = np.log(np.maximum(int_cut, 1e-30)) if yscale == "log" else int_cut.copy()
# Savitzky-Golayフィルタのパラメータを調整
npts, norder = ensure_valid_savgol(nsmooth_points, nsmooth_order, len(y_signal))
envelope = savgol_filter(y_signal, window_length=npts, polyorder=norder)
residual = y_signal - envelope
wavelength = xu.wavelength("CuKa1") # Cu Kα1波長を取得
# FFTによる膜厚推定
fft_cands, _, _, freq, fft_amp = estimate_thickness_from_fft_signal(omega_cut, residual, wavelength, keep_n=max(nguess_keep, 10))
if not fft_cands:
print("Warning: No FFT candidates found for thickness estimation.")
return param_sets
dq_ref = fft_cands[0]["dq"] # FFTで最も支配的なΔqを基準とする
# ピーク検出とクラスター化、線形回帰
# 残差が負の値にならないように調整してピーク検出
peak_rows, cluster_rows, cluster_fits = build_cluster_peak_tables(omega_cut, np.maximum(residual - np.min(residual), 0), wavelength, dq_ref, cluster_gap_factor)
if peak_rows: save_guess_peak_csv(get_guess_peaks_filename(infile), peak_rows)
if cluster_rows: save_guess_cluster_csv(get_guess_clusters_filename(infile), cluster_rows)
# 全ての膜厚候補を収集し、スコアでソート
all_cands = []
for c in fft_cands[:nguess_keep]:
all_cands.append({"method": "fft", "thick": c["thick"], "score": c["score"]})
for r in cluster_rows:
if np.isfinite(r["thick_reg"]): # 回帰による膜厚が有効な場合のみ追加
all_cands.append({"method": f"cl{r['cluster_id']}-reg", "thick": r["thick_reg"], "score": r["confidence"]})
all_cands = sorted(all_cands, key=lambda x: x["score"], reverse=True)
if not all_cands:
print("Warning: No valid thickness candidates found.")
return param_sets
# 最良の膜厚をプライマリセットに設定
best_ps = clone_parameter_set(param_sets[DEFAULT_PRIMARY_SET])
best_ps["thick"]["value"] = all_cands[0]["thick"]
set_best_parameter_set(param_sets, best_ps)
# 上位の候補をPSOの粒子としてパラメータファイルに保存
pso_cands = []
for c in all_cands[:nguess_keep]:
ps = clone_parameter_set(param_sets[DEFAULT_PRIMARY_SET])
ps["thick"]["value"] = c["thick"]
pso_cands.append(ps)
param_sets = replace_pso_parameter_sets(param_sets, pso_cands)
save_parameters(parameter_file, param_sets)
print("推定膜厚候補:")
for i, c in enumerate(all_cands[:10]): # 上位10件を表示
print(f" {i:2d} : {c['method']:10s} thick= {c['thick']:.2f} A")
# プロット生成
plt.ion() # インタラクティブモードON
# Figure 1: Signal/Envelope/Residual
fig1, ax1 = plt.subplots(figsize=(8, 5))
ax1.plot(omega_cut, y_signal, label="signal")
ax1.plot(omega_cut, envelope, label="envelope")
ax1.plot(omega_cut, residual, label="residual")
ax1.legend(); ax1.set_title("Fringe Analysis: Signal & Residual")
ax1.set_xlabel(r"$\omega\ (deg)$")
ax1.set_ylabel("Intensity (arb. u.)")
ax1.set_yscale(yscale)
# Figure 2: FFT Spectrum
fig2, ax2 = plt.subplots(figsize=(8, 5))
if len(freq) > 1 and len(fft_amp) > 1: # DC成分以外を表示
ax2.plot(freq[1:], fft_amp[1:])
ax2.set_title("FFT Spectrum")
ax2.set_xlabel(r"Frequency ($1/\AA^{-1}$)")
ax2.set_ylabel("Amplitude")
# Figure 3: N-Q plot (フリンジ次数 vs Q値)
if cluster_fits:
fig3, ax3 = plt.subplots(figsize=(7, 5))
for f in cluster_fits:
# Stage 2で採用されたピークのみを強調表示
used_peaks_q = f["q_vals"][f["fit_result"]["used_stage2"]]
used_peaks_n = f["n_vals"][f["fit_result"]["used_stage2"]]
rejected_peaks_q = f["q_vals"][~f["fit_result"]["used_stage2"]]
rejected_peaks_n = f["n_vals"][~f["fit_result"]["used_stage2"]]
ax3.plot(used_peaks_n, used_peaks_q, 'o', label=f"cl {f['cluster_id']} (used)")
if len(rejected_peaks_n) > 0:
ax3.plot(rejected_peaks_n, rejected_peaks_q, 'x', color=ax3.lines[-1].get_color(), alpha=0.5, label=f"cl {f['cluster_id']} (rejected)")
# 回帰直線を描画
ax3.plot(f["n_vals"], np.polyval(f["fit_result"]["coef2"], f["n_vals"]), '--', color=ax3.lines[-1].get_color(), label=f"Fit cl {f['cluster_id']}")
ax3.legend(); ax3.set_title("Linear Regression: n vs q")
ax3.set_xlabel("Fringe Order (n)")
ax3.set_ylabel(r"Q ($1/\AA^{-1}$)")
plt.ioff(); plt.show() # 非インタラクティブモードに戻し、全てのプロットを表示
return param_sets
# ============================================================
# メイン実行ルーチン
# ============================================================
[ドキュメント]
def main(args):
"""
コマンドライン引数に基づき、XRDシミュレーション、データ読み込み、フィッティング、または膜厚推定を実行します。
詳細説明:
この関数は、プログラムのエントリポイントであり、以下の処理フローに従います。
1. 基板と膜の材料モデルを準備します。
2. パラメータファイルから初期パラメータを読み込み、固定パラメータを適用して保存します。
3. `args.mode` に応じて、以下のいずれかの操作を実行します。
- "read": 2theta-intensityデータを読み込み、プロットします。
- "sim": 指定されたパラメータでXRDパターンをシミュレートし、プロットします。
入力ファイルがある場合は実験データと比較してプロットします。
- "guess": フリンジ解析に基づいて膜厚を推定し、結果をプロットしてパラメータファイルを更新します。
- "fit": 指定された最適化手法 (random, nelder-mead, bfgs, cg, pso) を使用して、
実験データにフィッティングを行い、パラメータを最適化します。
フィッティングの進捗はライブプロットで表示されます。
Parameters:
:param args: argparse.Namespace: コマンドライン引数をパースした結果のオブジェクト。
"""
substrate, film = prepare_materials(args.substrate_file, args.film_file)
# Cu Kα1波長を使用してX線エネルギーを計算
energy = 12390.0 / xu.wavelength("CuKa1")
parameter_file = get_parameter_filename(args.infile)
param_sets = read_parameters(parameter_file)
apply_fix_list(param_sets, parse_fix_list(args.fix))
save_parameters(parameter_file, param_sets) # 初期/読み込み済みパラメータを保存
if args.mode == "read":
twotheta, omega, intensity = read_data(args.infile, substrate, film, energy)
plot_xrd(omega, [intensity], ["ReadData"], "XRDデータ読込", yscale=args.yscale, wait=True)
elif args.mode == "sim":
sim_ps = param_sets[DEFAULT_PRIMARY_SET]
twotheta_sim, omega_sim = make_scan_axis()
int_sim = calc_pattern_from_param_set(substrate, film, energy, omega_sim, sim_ps)
if args.infile:
# 入力ファイルがある場合、それを読み込みシミュレーション結果と比較プロット
_, omega_ref, int_ref = read_data(args.infile, substrate, film, energy)
plot_xrd(omega_sim, [int_ref, int_sim], ["Data", "Sim"], "シミュレーション比較", yscale=args.yscale, wait=True)
else:
# 入力ファイルがない場合、シミュレーション結果のみをプロット
plot_xrd(omega_sim, [int_sim], ["Sim"], "XRDシミュレーション", yscale=args.yscale, wait=True)
elif args.mode == "guess":
_, omega, intensity = read_data(args.infile, substrate, film, energy)
guess_thickness(substrate, film, energy, omega, intensity, parameter_file, param_sets, args.guess_low, args.guess_high, args.nsmooth_points, args.nsmooth_order, args.nguess_keep, args.cluster_gap_factor, args.yscale, args.infile)
elif args.mode == "fit":
_, omega, int_target = read_data(args.infile, substrate, film, energy)
if args.method == "random":
fit_random(substrate, film, energy, omega, int_target, param_sets, parameter_file, args.yscale, args.residual_scale)
elif args.method == "pso":
fit_pso(substrate, film, energy, omega, int_target, param_sets, parameter_file, args.nmaxiter, args.tol, args.pso_nparticles, args.pso_w, args.pso_c1, args.pso_c2, args.yscale, args.residual_scale, args.pso_stall_max, args.pso_spread_rtol)
else:
fit_scipy(substrate, film, energy, omega, int_target, param_sets, parameter_file, args.method, args.nmaxiter, args.tol, args.yscale, args.residual_scale)
if __name__ == "__main__":
parser = build_parser()
try:
main(initialize(parser))
except Exception:
traceback.print_exc()
sys.exit(1)