"""
最小二乗法を用いた格子定数計算スクリプト。
このスクリプトは、X線回折データなどの回折データから結晶の格子定数を導出します。
様々な格子系に対応し、観測データの前処理、重み付け、最小二乗フィッティング、
そして最終的な格子定数(逆格子および実格子)の導出を行います。
外れ値の自動除去機能も備えています。
詳細説明:
入力ファイルは特定のフォーマットに従い、複数の計算ジョブを連続して処理できます。
各ジョブでは、格子系、補正フラグ、重み付けモード、回折位置の変換モードなどが
設定され、それに従って観測データが処理されます。
重み付き最小二乗法によりパラメータが決定され、その結果から逆格子定数と実格子定数、
およびそれらの標準偏差が計算されます。計算結果は整形されたレポートファイルとして出力されます。
関連リンク:
:doc:`lsq_latt2_usage`
"""
from __future__ import annotations
import argparse
import math
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Iterable, TextIO
import matplotlib.pyplot as plt
import numpy as np
DEG_PER_RAD = 180.0 / math.pi
RAD_PER_DEG = math.pi / 180.0
ANGLE_EPS = 1e-12
PERTURB_SCALE = 1e-4
OUTLIER_SIGMA = 3.0
[ドキュメント]
class LatticeCalculationError(RuntimeError):
"""
格子定数計算におけるエラーを示す例外。
詳細説明:
この例外は、入力データが無効な場合、または最小二乗問題の解決中に
計算上の問題(例: 特異行列)が発生した場合に発生します。
"""
[ドキュメント]
@dataclass(slots=True)
class Observation:
"""
一つの回折観測データを保持するデータクラス。
詳細説明:
このクラスは、特定の回折面(hkl)に対応する生の回折位置、変換された位置、
計算で使用される重み、デザイン行列の行、および観測時の波長などの
詳細な情報を含みます。
:param serial_index: int: 観測データの通し番号。
:param h: int: ミラー指数 h。
:param k: int: ミラー指数 k。
:param l: int: ミラー指数 l。
:param raw_position: float: 生の回折位置データ。
:param transformed_position: float: 変換された回折位置データ(最小二乗法に使用)。
:param weight: float: この観測データに対する重み。
:param design_row: np.ndarray: この観測データに対応するデザイン行列の行。
:param wavelength: float: この観測で使用されたX線の波長。
"""
serial_index: int
h: int
k: int
l: int
raw_position: float
transformed_position: float
weight: float
design_row: np.ndarray
wavelength: float
[ドキュメント]
@dataclass(slots=True)
class LeastSquaresResult:
"""
最小二乗法の結果を保持するデータクラス。
詳細説明:
このクラスは、最小二乗法によって計算されたパラメータ、それらの標準偏差、
フィットされたy値、残差、観測の標準偏差、および使用された観測データなどを
格納します。外れ値として除外された観測データも追跡されます。
:param parameters: np.ndarray: フィットされたパラメータの配列。
:param parameter_sigma: np.ndarray: 各パラメータの標準偏差の配列。
:param fitted_y: np.ndarray: フィットモデルによって計算されたy値の配列。
:param residuals: np.ndarray: 実測y値とフィットされたy値の残差の配列。
:param observation_sigma: np.ndarray: 各観測の標準偏差の配列。
:param kept_observations: list[Observation]: 最小二乗法に使用された観測データのリスト。
:param covariance: np.ndarray: パラメータの共分散行列。
:param rejected_observations: list[Observation]: 外れ値として除外された観測データのリスト。デフォルトは空のリスト。
"""
parameters: np.ndarray
parameter_sigma: np.ndarray
fitted_y: np.ndarray
residuals: np.ndarray
observation_sigma: np.ndarray
kept_observations: list[Observation]
covariance: np.ndarray
rejected_observations: list[Observation] = field(default_factory=list)
[ドキュメント]
@dataclass(slots=True)
class CellConstants:
"""
格子定数(逆格子および実格子)を保持するデータクラス。
詳細説明:
逆格子と実格子の両方について、長さ、角度(度数)、コサイン、
体積およびそれぞれの標準偏差を格納します。
デフォルト値としてゼロまたは標準的な値(角度90度)が設定されています。
:param reciprocal_lengths: np.ndarray: 逆格子の長さ (a*, b*, c*) の配列。デフォルトは全て0.0。
:param reciprocal_length_sigma: np.ndarray: 逆格子の長さの標準偏差の配列。デフォルトは全て0.0。
:param reciprocal_angles_deg: np.ndarray: 逆格子の角度 (alpha*, beta*, gamma*) の配列(度数)。デフォルトは全て90.0。
:param reciprocal_angle_sigma_deg: np.ndarray: 逆格子の角度の標準偏差の配列(度数)。デフォルトは全て0.0。
:param reciprocal_cosines: np.ndarray: 逆格子の角度のコサイン (cos(alpha*), cos(beta*), cos(gamma*)) の配列。デフォルトは全て0.0。
:param reciprocal_cosine_sigma: np.ndarray: 逆格子の角度のコサインの標準偏差の配列。デフォルトは全て0.0。
:param reciprocal_volume: float: 逆格子の体積。デフォルトは0.0。
:param reciprocal_volume_sigma: float: 逆格子の体積の標準偏差。デフォルトは0.0。
:param direct_lengths: np.ndarray: 実格子の長さ (a, b, c) の配列。デフォルトは全て0.0。
:param direct_length_sigma: np.ndarray: 実格子の長さの標準偏差の配列。デフォルトは全て0.0。
:param direct_angles_deg: np.ndarray: 実格子の角度 (alpha, beta, gamma) の配列(度数)。デフォルトは全て90.0。
:param direct_angle_sigma_deg: np.ndarray: 実格子の角度の標準偏差の配列(度数)。デフォルトは全て0.0。
:param direct_cosines: np.ndarray: 実格子の角度のコサイン (cos(alpha), cos(beta), cos(gamma)) の配列。デフォルトは全て0.0。
:param direct_cosine_sigma: np.ndarray: 実格子の角度のコサインの標準偏差の配列。デフォルトは全て0.0。
:param direct_volume: float: 実格子の体積。デフォルトは0.0。
:param direct_volume_sigma: float: 実格子の体積の標準偏差。デフォルトは0.0。
"""
reciprocal_lengths: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=float))
reciprocal_length_sigma: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=float))
reciprocal_angles_deg: np.ndarray = field(default_factory=lambda: np.full(3, 90.0, dtype=float))
reciprocal_angle_sigma_deg: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=float))
reciprocal_cosines: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=float))
reciprocal_cosine_sigma: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=float))
reciprocal_volume: float = 0.0
reciprocal_volume_sigma: float = 0.0
direct_lengths: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=float))
direct_length_sigma: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=float))
direct_angles_deg: np.ndarray = field(default_factory=lambda: np.full(3, 90.0, dtype=float))
direct_angle_sigma_deg: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=float))
direct_cosines: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=float))
direct_cosine_sigma: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=float))
direct_volume: float = 0.0
direct_volume_sigma: float = 0.0
[ドキュメント]
@dataclass(slots=True)
class JobConfig:
"""
格子定数計算ジョブの設定を保持するデータクラス。
詳細説明:
このクラスは、計算に使用する格子系の種類、適用する補正の種類、
観測データの重み付け方法、回折位置の変換方法、およびプロファイルモードの
設定情報をカプセル化します。
:param lattice_system: int: 格子系の種類を示す整数コード(例: 1=三斜晶, 4=直方晶など)。
:param correction_flags: list[int]: 5つの補正フラグのリスト。各フラグは特定の補正の適用有無を示す。
:param weight_mode: int: 観測データに適用する重み付けモード。
:param position_mode: int: 生の回折位置データを変換するモード。
:param profile_mode: int: プロファイル分析モード(このコードでは未使用だが、入力形式の一部)。
"""
lattice_system: int
correction_flags: list[int]
weight_mode: int
position_mode: int
profile_mode: int
[ドキュメント]
@dataclass(slots=True)
class JobData:
"""
一つの格子定数計算ジョブに関する全てのデータを保持するデータクラス。
詳細説明:
ジョブのタイトル、設定、処理する観測データのリスト、および
参照として使用される波長を含みます。
:param title: str: ジョブのタイトル。
:param config: JobConfig: このジョブの設定情報。
:param observations: list[Observation]: このジョブで処理される観測データのリスト。
:param reference_wavelength: float: このジョブの参照波長。
"""
title: str
config: JobConfig
observations: list[Observation]
reference_wavelength: float
[ドキュメント]
class LineReader:
"""
テキストストリームから行を読み込むためのヘルパークラス。
詳細説明:
このクラスは、テキストファイルを一行ずつ読み込み、現在の行番号を追跡し、
ファイルの予期せぬ終端を検出する機能を提供します。
空行でない行を確実に読み込むためのメソッドも備えています。
:param stream: TextIO: 読み込むテキストストリーム。
"""
def __init__(self, stream: TextIO):
"""
LineReaderの新しいインスタンスを初期化する。
:param stream: TextIO: 読み込むテキストストリーム。
"""
self.stream = stream
self.line_number = 0
[ドキュメント]
def read_line(self, *, allow_eof: bool = False) -> str | None:
"""
ストリームから一行を読み込む。
詳細説明:
ファイルの終端に達した場合、`allow_eof` が `True` であれば `None` を返します。
`False` の場合は `EOFError` を発生させます。読み込んだ行末の改行文字は除去されます。
:param allow_eof: bool: Trueの場合、ファイルの終端に達してもEOFErrorを発生させず、Noneを返す。デフォルトはFalse。
:returns: str | None: 読み込まれた行(末尾の改行文字は除去)。EOFでallow_eofがTrueの場合はNone。
:raises EOFError: `allow_eof`がFalseのときに、ファイルの予期せぬ終端に達した場合。
"""
line = self.stream.readline()
if line == "":
if allow_eof:
return None
raise EOFError("unexpected end of file")
self.line_number += 1
return line.rstrip("\n")
[ドキュメント]
def read_nonempty_line(self) -> str:
"""
ストリームから空でない行を読み込む。
詳細説明:
空行をスキップし、最初に見つかった空でない行を返します。
ファイルの終端に達した場合は `EOFError` を発生させます。
:returns: str: 読み込まれた空でない行(末尾の改行文字は除去)。
:raises EOFError: ファイルの終端に達した場合。
:raises AssertionError: 読み込まれた行がNoneであった場合 (内部的なガード)。
"""
line = self.read_line()
assert line is not None
return line
[ドキュメント]
class SigmaPropagator:
"""
誤差伝播の計算を行うクラス。
詳細説明:
このクラスは、複数の入力値とそれぞれの標準偏差が与えられたときに、
それらの入力値に依存する関数が返す値とその標準偏差を、
数値微分(摂動法)を用いて計算します。
:param values: Iterable[float]: 関数に渡す入力値のシーケンス。
:param sigmas: Iterable[float]: 各入力値の標準偏差のシーケンス。
"""
def __init__(self, values: Iterable[float], sigmas: Iterable[float]):
"""
SigmaPropagatorの新しいインスタンスを初期化する。
:param values: Iterable[float]: 関数に渡す入力値のシーケンス。
:param sigmas: Iterable[float]: 各入力値の標準偏差のシーケンス。
"""
self.values = np.array(list(values), dtype=float)
self.sigmas = np.array(list(sigmas), dtype=float)
[ドキュメント]
def evaluate(self, func: Callable[[np.ndarray], float]) -> tuple[float, float]:
"""
与えられた関数を評価し、その値と標準偏差を計算する。
詳細説明:
入力値の各要素を微小に摂動させ、その変化から数値微分を計算します。
これらの数値微分と各入力値の標準偏差を用いて、誤差伝播の法則により
関数の結果の標準偏差を導出します。
:param func: Callable[[np.ndarray], float]: 誤差伝播を計算する関数。
NumPy配列を受け取り、単一のfloatを返す必要があります。
:returns: tuple[float, float]: 関数の評価値と、その標準偏差のタプル。
"""
base = func(self.values.copy())
variance = 0.0
for idx, sigma in enumerate(self.sigmas):
if abs(sigma) < ANGLE_EPS:
continue
shifted = self.values.copy()
shifted[idx] += PERTURB_SCALE * sigma
deriv_sigma = (func(shifted) - base) / PERTURB_SCALE
variance += deriv_sigma * deriv_sigma
return base, math.sqrt(max(variance, 0.0))
# ---------- elementary math helpers ----------
[ドキュメント]
def clamp_unit(x: float) -> float:
"""
値を -1.0 から 1.0 の範囲にクランプする。
詳細説明:
主に `math.acos` の引数が有効な範囲内にあることを保証するために使用されます。
入力値が -1.0 未満であれば -1.0 に、1.0 を超えれば 1.0 に丸めます。
:param x: float: クランプする値。
:returns: float: -1.0 から 1.0 の範囲にクランプされた値。
"""
return max(-1.0, min(1.0, x))
[ドキュメント]
def safe_sqrt(x: float) -> float:
"""
非負の引数に対して安全に平方根を計算する。
詳細説明:
`math.sqrt` は負の引数に対して `ValueError` を発生させますが、
この関数は引数が負の場合でも0.0として処理することでエラーを回避し、
常に非負の値を返します。
:param x: float: 平方根を計算する値。
:returns: float: xの平方根。xが負の場合は0.0。
"""
return math.sqrt(max(0.0, x))
[ドキュメント]
def acos_deg(cosine: float) -> float:
"""
コサイン値から角度を度数で計算する。
詳細説明:
入力されたコサイン値が有効な範囲 [-1, 1] 内にあることを保証するために、
`clamp_unit` 関数で値をクランプしてから `math.acos` を適用します。
結果はラジアンから度数に変換されます。
:param cosine: float: コサイン値。
:returns: float: 度数で表された角度。
"""
return math.degrees(math.acos(clamp_unit(cosine)))
[ドキュメント]
def reciprocal_volume_from_cosines(cosines: np.ndarray) -> float:
"""
逆格子のコサイン値から逆格子単位胞体積の計算に用いる係数を計算する。
詳細説明:
逆格子単位胞体積 V* は、V* = a*b*c* * sqrt(1 - cos^2(alpha*) - cos^2(beta*) - cos^2(gamma*) + 2cos(alpha*)cos(beta*)cos(gamma*))
で与えられます。この関数は、上記式の `sqrt(...)` の部分を計算します。
:param cosines: np.ndarray: 逆格子の角度のコサイン値 (cos_alpha_star, cos_beta_star, cos_gamma_star) を含むNumPy配列。
:returns: float: 逆格子単位胞体積計算に使用される平方根の値。
"""
ca, cb, cg = cosines
term = 1.0 - ca * ca - cb * cb - cg * cg + 2.0 * ca * cb * cg
return safe_sqrt(term)
[ドキュメント]
def direct_cosine_from_reciprocal(values: np.ndarray) -> float:
"""
逆格子のコサイン値から実格子のコサイン値を計算する。
詳細説明:
逆格子の角度 (alpha*, beta*, gamma*) のコサイン値から、
対応する実格子の角度 (alpha, beta, gamma) のコサイン値を導出します。
例えば、実格子の cos(alpha) は逆格子の cos(alpha*), cos(beta*), cos(gamma*) から計算されます。
:param values: np.ndarray: (cos_alpha_star, cos_beta_star, cos_gamma_star) を含むNumPy配列。
:returns: float: 対応する実格子のコサイン値。
"""
cos_alpha_star, cos_beta_star, cos_gamma_star = values
sin_beta_star = safe_sqrt(1.0 - cos_beta_star * cos_beta_star)
sin_gamma_star = safe_sqrt(1.0 - cos_gamma_star * cos_gamma_star)
denom = sin_beta_star * sin_gamma_star
if denom <= ANGLE_EPS:
return 0.0
return (cos_beta_star * cos_gamma_star - cos_alpha_star) / denom
[ドキュメント]
def direct_length_from_reciprocal(values: np.ndarray) -> float:
"""
逆格子の長さとコサイン値から実格子の長さを計算する。
詳細説明:
逆格子定数 (a*, cos_alpha*, cos_beta*, cos_gamma*) から、
対応する実格子の長さ (a) を導出します。
例: 実格子の長さaは、逆格子のa*, cos(alpha*), cos(beta*), cos(gamma*) から計算されます。
:param values: np.ndarray: (a_star, cos_alpha_star, cos_beta_star, cos_gamma_star) を含むNumPy配列。
:returns: float: 対応する実格子の長さ。
"""
a_star, cos_alpha_star, cos_beta_star, cos_gamma_star = values
if abs(a_star) <= ANGLE_EPS:
return 0.0
sin_alpha_star = safe_sqrt(1.0 - cos_alpha_star * cos_alpha_star)
v_star = reciprocal_volume_from_cosines(np.array([cos_alpha_star, cos_beta_star, cos_gamma_star]))
if v_star <= ANGLE_EPS:
return 0.0
return sin_alpha_star / (a_star * v_star)
[ドキュメント]
def direct_volume_from_reciprocal(values: np.ndarray) -> float:
"""
逆格子の長さとコサイン値から実格子の体積を計算する。
詳細説明:
逆格子定数 (a*, b*, c*, cos_alpha*, cos_beta*, cos_gamma*) から
実格子の体積を導出します。実格子の体積は逆格子の体積の逆数です。
:param values: np.ndarray: (a_star, b_star, c_star, cos_alpha_star, cos_beta_star, cos_gamma_star) を含むNumPy配列。
:returns: float: 実格子の体積。
"""
a_star, b_star, c_star, cos_alpha_star, cos_beta_star, cos_gamma_star = values
v_star = a_star * b_star * c_star * reciprocal_volume_from_cosines(
np.array([cos_alpha_star, cos_beta_star, cos_gamma_star])
)
if v_star <= ANGLE_EPS:
return 0.0
return 1.0 / v_star
[ドキュメント]
def trigonal_direct_length(values: np.ndarray) -> float:
"""
三方晶系の逆格子定数から実格子の長さを計算する。
詳細説明:
三方晶系において、逆格子のa*とcos(alpha*)から実格子の長さaを導出します。
等方的な格子定数を持つ三方晶系(菱面体晶)の特殊なケースに適用されます。
:param values: np.ndarray: (a_star, cos_alpha_star) を含むNumPy配列。
:returns: float: 三方晶系における実格子の長さa。
"""
a_star, cos_alpha_star = values
if abs(a_star) <= ANGLE_EPS:
return 0.0
numerator = 1.0 - cos_alpha_star * cos_alpha_star
denominator = 1.0 - 3.0 * cos_alpha_star * cos_alpha_star + 2.0 * cos_alpha_star**3
if denominator <= ANGLE_EPS or numerator < 0.0:
return 0.0
return safe_sqrt(numerator / denominator) / a_star
[ドキュメント]
def trigonal_direct_cosine(values: np.ndarray) -> float:
"""
三方晶系の逆格子のコサイン値から実格子のコサイン値を計算する。
詳細説明:
三方晶系において、逆格子のcos(alpha*)から実格子のcos(alpha)を導出します。
:param values: np.ndarray: (cos_alpha_star,) を含むNumPy配列。
:returns: float: 三方晶系における実格子のコサイン値cos(alpha)。
"""
(cos_alpha_star,) = values
denominator = 1.0 - cos_alpha_star * cos_alpha_star
if abs(denominator) <= ANGLE_EPS:
return 0.0
return cos_alpha_star * (cos_alpha_star - 1.0) / denominator
[ドキュメント]
def trigonal_direct_volume(values: np.ndarray) -> float:
"""
三方晶系の逆格子定数から実格子の体積を計算する。
詳細説明:
三方晶系において、逆格子のa*とcos(alpha*)から実格子の体積Vを導出します。
:param values: np.ndarray: (a_star, cos_alpha_star) を含むNumPy配列。
:returns: float: 三方晶系における実格子の体積V。
"""
a_star, cos_alpha_star = values
term = 1.0 - 3.0 * cos_alpha_star * cos_alpha_star + 2.0 * cos_alpha_star**3
if term <= 0.0:
return 0.0
v_star = a_star**3 * safe_sqrt(term)
if v_star <= ANGLE_EPS:
return 0.0
return 1.0 / v_star
# ---------- model construction ----------
[ドキュメント]
def base_design_terms(h: int, k: int, l: int) -> np.ndarray:
"""
回折指数 (h, k, l) からデザイン行列の基本的な項を計算する。
詳細説明:
格子定数計算におけるデザイン行列を構築するための基本要素として、
h^2, k^2, l^2, 2kl, 2lh, 2hk の6つの項を計算します。
これらの項は、一般的な三斜晶系を含む様々な格子系のデザイン行列の基礎となります。
:param h: int: ミラー指数h。
:param k: int: ミラー指数k。
:param l: int: ミラー指数l。
:returns: np.ndarray: 6つのデザイン行列項を含むNumPy配列。
"""
return np.array(
[
float(h * h),
float(k * k),
float(l * l),
2.0 * float(k * l),
2.0 * float(l * h),
2.0 * float(h * k),
],
dtype=float,
)
[ドキュメント]
def lattice_design_row(lattice_system: int, h: int, k: int, l: int) -> np.ndarray:
"""
指定された格子系とミラー指数に基づいて、デザイン行列の行を構築する。
詳細説明:
各格子系(三斜晶、単斜晶、直方晶、正方晶、六方晶、立方晶、菱面体晶)は、
結晶学的に異なる格子定数の制約を持ちます。この関数は、その制約に基づいて、
`base_design_terms` から必要な項を選択・結合し、デザイン行列の1行を生成します。
格子系のコードと対応するパラメータ:
- 1: 三斜晶 (triclinic): a*^2, b*^2, c*^2, 2b*c*cos(alpha*), 2c*a*cos(beta*), 2a*b*cos(gamma*)
- 2: 単斜晶 (monoclinic, unique b): a*^2, b*^2, c*^2, 2c*a*cos(beta*)
- 3: 単斜晶 (monoclinic, unique c): a*^2, b*^2, c*^2, 2a*b*cos(gamma*) (※コードではgamma*として実装されているが、入力形式によってはalpha*を意味する場合もある)
- 4: 直方晶 (orthorhombic): a*^2, b*^2, c*^2
- 5: 正方晶 (tetragonal): a*^2, c*^2 (a*=b*, alpha*=beta*=gamma*=90deg)
- 6: 立方晶 (cubic): a*^2 (a*=b*=c*, alpha*=beta*=gamma*=90deg)
- 7: 菱面体晶 (rhombohedral, trigonal): a*^2+b*^2+c*^2, 2(kl+lh+hk) (a*=b*=c*, alpha*=beta*=gamma*)
- 8: 六方晶 (hexagonal): a*^2, c*^2 (a*=b*, alpha*=beta*=90deg, gamma*=120deg)
:param lattice_system: int: 格子系の種類を示す整数コード(1-8)。
:param h: int: ミラー指数h。
:param k: int: ミラー指数k。
:param l: int: ミラー指数l。
:returns: np.ndarray: 指定された格子系のデザイン行列の行。
:raises LatticeCalculationError: 未対応の格子系が指定された場合。
"""
x = base_design_terms(h, k, l)
if lattice_system == 1: # Triclinic
return x.copy()
if lattice_system == 2: # Monoclinic (unique b axis, beta*)
return np.array([x[0], x[1], x[2], x[4]], dtype=float) # h^2, k^2, l^2, 2lh
if lattice_system == 3: # Monoclinic (unique c axis, gamma*) or (unique a axis, alpha*)
return np.array([x[0], x[1], x[2], x[5]], dtype=float) # h^2, k^2, l^2, 2hk
if lattice_system == 4: # Orthorhombic
return np.array([x[0], x[1], x[2]], dtype=float) # h^2, k^2, l^2
if lattice_system == 5: # Tetragonal
return np.array([x[0] + x[1], x[2]], dtype=float) # h^2+k^2, l^2
if lattice_system == 6: # Cubic
return np.array([x[0] + x[1] + x[2]], dtype=float) # h^2+k^2+l^2
if lattice_system == 7: # Rhombohedral (trigonal)
return np.array([x[0] + x[1] + x[2], x[3] + x[4] + x[5]], dtype=float) # h^2+k^2+l^2, 2(kl+lh+hk)
if lattice_system == 8: # Hexagonal
return np.array([x[0] + x[1] + 0.5 * x[5], x[2]], dtype=float) # h^2+k^2+hk, l^2
raise LatticeCalculationError(f"unsupported lattice system: {lattice_system}")
[ドキュメント]
def calc_weight(weight_mode: int, transformed: float, sigma_or_weight: float | None) -> float:
"""
指定された重みモードと変換された位置データに基づいて観測の重みを計算する。
詳細説明:
最小二乗法では、各観測データの信頼度に応じて重みを設定することで、
より信頼性の高いデータに大きな影響を与えることができます。
この関数は、異なる重み付けスキーム(一定、測定誤差に基づくなど)を実装しています。
重みモードの種類:
- 0: シグマ・シータに基づく重み。transformed_positionがsin(theta)である場合、1/(sigma_theta^2 * 4 * sin^2(theta) * cos^2(theta)) に比例。
- 1: シグマ・ツーシータに基づく重み。transformed_positionがsin(theta)である場合、1/(sigma_2theta^2 * 4 * sin^2(theta) * cos^2(theta)) に比例。
- 2: 一定の重み (1.0)。
- 3: 直接指定された重み。
- 4: シグマ値の逆二乗に基づく重み (1/sigma^2)。
:param weight_mode: int: 重み計算モード(0-4)。
:param transformed: float: 変換された回折位置データ(通常はsin(θ))。
:param sigma_or_weight: float | None: モードによって、回折角度の標準偏差または直接指定される重み値。Noneの場合もある。
:returns: float: 計算された観測の重み。
:raises LatticeCalculationError: 重みモードに必要な追加データが提供されない場合。
"""
bas = transformed * transformed # transformed position squared (sin^2(theta) if transformed is sin(theta))
if weight_mode == 0:
if sigma_or_weight is None:
raise LatticeCalculationError("weight mode 0 requires an additional sigma line")
sigma_theta = 0.25 * sigma_or_weight # sigma_or_weight is 2*sigma_2theta in some old contexts
sigma_theta = max(sigma_theta, 0.25) # Ensure minimum sigma_theta
denom = bas * (1.0 - bas)
# Original Fortran implied 1 / (sigma_theta^2 * 4 * transformed^2 * (1-transformed^2))
# The Python code is `sigma_theta / denom`, which is unusual for 1/variance.
# However, following the existing code logic:
return 0.0 if abs(denom) <= ANGLE_EPS else sigma_theta / denom
if weight_mode == 1:
if sigma_or_weight is None:
raise LatticeCalculationError("weight mode 1 requires an additional sigma line")
sigma_two_theta = 2.0 * sigma_or_weight # sigma_two_theta is related to two_theta
denom = bas * (1.0 - bas) * sigma_two_theta * sigma_two_theta
# Original Fortran implied 1 / (sigma_2theta^2 * 4 * transformed^2 * (1-transformed^2))
# The Python code is `0.25 / denom`, which is `1 / (4 * denom)` if denom already includes sigma_2theta^2.
return 0.0 if abs(denom) <= ANGLE_EPS else 0.25 / denom
if weight_mode == 2:
return 1.0
if weight_mode == 3:
if sigma_or_weight is None:
raise LatticeCalculationError("weight mode 3 requires an additional weight line")
return sigma_or_weight if sigma_or_weight > 0.0 else 1.0
if weight_mode == 4:
if sigma_or_weight is None:
raise LatticeCalculationError("weight mode 4 requires an additional sigma line")
# Weight = 1/sigma^2
return 0.0 if sigma_or_0weight == 0.0 else 1.0 / (sigma_or_weight * sigma_or_weight)
raise LatticeCalculationError(f"unsupported weight mode: {weight_mode}")
[ドキュメント]
def correction_terms(flags: list[int], transformed: float) -> list[float]:
"""
指定された補正フラグと変換された位置データに基づいて補正項を計算する。
詳細説明:
回折データの測定には、様々な系統誤差(例:吸収、分極、ローレンツ因子など)
が影響を与える可能性があります。この関数は、これらの誤差を補正するための
特定の補正項を計算し、最小二乗法のデザイン行列に追加できるようにします。
`transformed` は通常 sin(θ) に対応します。
:param flags: list[int]: 5つの補正フラグのリスト。各フラグは特定の補正の有無を示す(0:なし、1:あり)。
:param transformed: float: 変換された回折位置データ(通常はsin(θ))。
:returns: list[float]: 計算された補正項のリスト。
"""
bas = transformed * transformed # sin(theta)^2
theta = math.asin(clamp_unit(transformed)) # theta in radians
terms: list[float] = []
if flags[0]:
# Correction 1: absorption? or 1/sin(2θ)? The original Fortran code suggests a zero shift correction
# A.k.a. "dθ_c" in some contexts. This form resembles d_c * cot(theta)
terms.append((math.pi / 2.0 - theta) * math.tan(math.pi / 2.0 - theta))
if flags[1]:
# Correction 2: polarization factor, or L.P. factor component
# sin^2(2θ) term, or 4 * sin^2(θ) * cos^2(θ)
terms.append(4.0 * bas * (1.0 - bas))
if flags[2]:
# Correction 3: usually related to sample transparency or beam divergence
# The term 1/transformed + 1/theta is unusual.
# If transformed = sin(theta), then 1/sin(theta) + 1/theta.
# This term is only added if transformed and theta are not extremely small.
if abs(transformed) <= ANGLE_EPS or abs(theta) <= ANGLE_EPS:
terms.append(0.0)
else:
terms.append(4.0 * bas * (1.0 - bas) * (1.0 / transformed + 1.0 / theta))
if flags[3]:
# Correction 4: Often related to preferred orientation or sample height error
# sin(2θ) term
terms.append(math.sin(2.0 * theta))
if flags[4]:
# Correction 5: Another component, possibly related to preferred orientation or asymmetric peak broadening
# sin(2θ) * cos(θ) term
terms.append(math.sin(2.0 * theta) * math.cos(theta))
return terms
[ドキュメント]
def read_numeric_line(reader: LineReader, expected: int) -> list[float]:
"""
リーダーから一行を読み込み、指定された数の浮動小数点数に変換する。
詳細説明:
入力ファイルから設定値やパラメータを読み込む際に使用されるヘルパー関数。
読み込んだ行をスペースで分割し、最初の`expected`個の要素を浮動小数点数に変換します。
:param reader: LineReader: 入力ファイルを読み込むためのLineReaderインスタンス。
:param expected: int: 期待される数値の最小数。
:returns: list[float]: 読み込まれた数値のリスト。
:raises LatticeCalculationError: 期待されるよりも少ない数値が読み込まれた場合。
"""
raw = reader.read_nonempty_line().split()
if len(raw) < expected:
raise LatticeCalculationError(
f"line {reader.line_number}: expected at least {expected} numeric values, got {len(raw)}"
)
return [float(x) for x in raw[:expected]]
[ドキュメント]
def parse_jobs(stream: TextIO) -> list[JobData]:
"""
入力ストリームから複数の格子定数計算ジョブを解析する。
詳細説明:
入力ファイルは、一連の計算ジョブで構成されます。各ジョブはタイトル、
計算設定、および複数の回折観測データを含みます。この関数は、
ストリームの終端に達するか、特定の終了コード(`h=1000`で次のジョブ、`job_code=0`で全体終了)
が読み込まれるまで、ジョブを順次解析して `JobData` オブジェクトのリストを生成します。
入力ファイルの主要なセクション:
1. ジョブタイトル
2. ジョブ設定パラメータ(格子系、補正フラグ、モードなど)
3. 波長と装置半径
4. 回折観測データのブロック (h k l raw_position)
- (position_mode=3の場合) `zero_shift`
- (weight_mode in {0,1,3,4}の場合) `sigma_or_weight`
- 特殊行: `2000 wavelength radius` で波長と半径を更新
- 特殊行: `1000 ...` で現在のジョブの終了を示す
5. ジョブ終了コード (0: 全体の終了, 1: 次のジョブへ)
:param stream: TextIO: 入力データを含むテキストストリーム。
:returns: list[JobData]: 解析されたJobDataオブジェクトのリスト。
:raises EOFError: ファイルの予期せぬ終端に達した場合。
:raises LatticeCalculationError: 入力ファイルの形式が無効な場合(例: ヘッダー情報不足、反射行のデータ不足)。
"""
reader = LineReader(stream)
jobs: list[JobData] = []
while True:
title = reader.read_line(allow_eof=True)
if title is None:
break # EOF encountered
title = title.strip()
if title == "":
continue # Skip empty title lines
# Read job configuration parameters
params = reader.read_nonempty_line().split()
if len(params) < 9:
raise LatticeCalculationError(f"line {reader.line_number}: invalid job header")
lattice_system, *raw_flags, weight_mode, position_mode, profile_mode = map(int, params[:9])
correction_flags = raw_flags[:5] # Take first 5 flags
config = JobConfig(
lattice_system=lattice_system,
correction_flags=correction_flags,
weight_mode=weight_mode,
position_mode=position_mode,
profile_mode=profile_mode,
)
# Read initial wavelength and radius
wavelength, radius = read_numeric_line(reader, 2)
reference_wavelength = wavelength
observations: list[Observation] = []
serial_index = 0
# Read reflection data
line = reader.read_nonempty_line()
while True:
parts = line.split()
if len(parts) < 4:
raise LatticeCalculationError(f"line {reader.line_number}: invalid reflection line")
h, k, l = map(int, parts[:3])
raw_position = float(parts[3])
if not (h < 1000 or h == 2000): # Check for job termination (h >= 1000 and h != 2000)
break
if h == 2000: # Special line: update wavelength and radius
wavelength, radius = read_numeric_line(reader, 2)
if len(observations) == 0: # If this is the first wavelength for the job
reference_wavelength = wavelength
line = reader.read_nonempty_line()
continue
serial_index += 1
zero_shift = None
if position_mode == 3:
zero_shift = float(reader.read_nonempty_line().strip())
transformed = transformed_position(raw_position, position_mode, wavelength, radius, zero_shift)
sigma_or_weight = None
if weight_mode in {0, 1, 3, 4}:
sigma_or_weight = float(reader.read_nonempty_line().strip())
weight = calc_weight(weight_mode, transformed, sigma_or_weight)
# Construct design row
row = lattice_design_row(lattice_system, h, k, l)
row = np.concatenate([row, np.array(correction_terms(correction_flags, transformed), dtype=float)])
# The y-value for least squares, typically proportional to 1/d^2
# Here it's 4.0 / (lambda^2) * sin^2(theta) = 1/d^2, if transformed is sin(theta)
# The original code comment says "transformed_position (for least square)" and is (4.0/lambda^2)*sin^2(theta)
# If transformed is already sin(theta), then this is 1/d^2.
y_value = (4.0 / (wavelength * wavelength)) * transformed * transformed
observations.append(
Observation(
serial_index=serial_index,
h=h,
k=k,
l=l,
raw_position=raw_position,
transformed_position=y_value,
weight=weight,
design_row=row,
wavelength=wavelength,
)
)
line = reader.read_nonempty_line()
jobs.append(JobData(title=title, config=config, observations=observations, reference_wavelength=reference_wavelength))
# Read job termination code
job_code = int(reader.read_nonempty_line().strip())
if job_code == 0:
break # All jobs finished
return jobs
# ---------- least squares ----------
[ドキュメント]
def weighted_least_squares(observations: list[Observation], output: TextIO) -> LeastSquaresResult:
"""
観測データに対して重み付き最小二乗法を実行する。
詳細説明:
与えられた観測データに対し、重み付き最小二乗法を用いてモデルパラメータを推定します。
このプロセスには、計算された残差に基づいて外れ値を自動的に検出し、除去する機能が含まれます。
外れ値の除去は、安定したパラメータ推定値が得られるまで繰り返されます。
最終的なレポートは `output` ストリームに書き出されません。
:param observations: list[Observation]: 最小二乗法に使用する観測データのリスト。
:param output: TextIO: 現在のところ、この関数からは直接出力されません。将来的なログ出力用。
:returns: LeastSquaresResult: 最小二乗法の計算結果、フィットされたパラメータ、残差、使用および除外された観測データを含むオブジェクト。
:raises LatticeCalculationError: 観測データが提供されない場合、またはデザイン行列が特異な場合。
"""
if not observations:
raise LatticeCalculationError("no observations were provided")
current = observations[:]
rejected: list[Observation] = []
n_params = current[0].design_row.size
while True:
y = np.array([obs.transformed_position for obs in current], dtype=float)
w = np.array([obs.weight for obs in current], dtype=float)
x = np.vstack([obs.design_row for obs in current])
if len(current) < n_params:
raise LatticeCalculationError("number of observations is smaller than number of parameters")
normal = x.T @ (w[:, None] * x)
rhs = x.T @ (w * y)
try:
covariance_unscaled = np.linalg.inv(normal)
except np.linalg.LinAlgError as exc:
raise LatticeCalculationError("normal matrix is singular") from exc
params = covariance_unscaled @ rhs
fitted = x @ params
residuals = y - fitted
dof = len(current) - n_params
if dof == 0:
sigma_obs = np.zeros(len(current), dtype=float)
sigma_params = np.zeros(n_params, dtype=float)
covariance = covariance_unscaled.copy()
return LeastSquaresResult(
params,
sigma_params,
fitted,
residuals,
sigma_obs,
current,
covariance,
rejected,
)
weighted_ss = float(np.sum(w * residuals * residuals))
scale = math.sqrt(weighted_ss / dof)
sigma_obs = np.where(w > 0.0, scale / np.sqrt(w), 0.0)
outlier_mask = np.abs(residuals) > OUTLIER_SIGMA * sigma_obs
if np.any(outlier_mask):
removed = [obs for obs, flag in zip(current, outlier_mask) if flag]
rejected.extend(removed)
current = [obs for obs, flag in zip(current, outlier_mask) if not flag]
continue
covariance = covariance_unscaled * (scale * scale)
sigma_params = np.sqrt(np.maximum(np.diag(covariance), 0.0))
return LeastSquaresResult(
params,
sigma_params,
fitted,
residuals,
sigma_obs,
current,
covariance,
rejected,
)
# ---------- post-processing ----------
[ドキュメント]
def derive_cell_constants(lattice_system: int, params: np.ndarray, sigma: np.ndarray) -> CellConstants:
"""
最小二乗法で得られたパラメータから逆格子および実格子の定数を導出する。
詳細説明:
最小二乗法によって決定された格子パラメータとそれらの標準偏差を受け取り、
指定された格子系の結晶学的な関係に基づいて、最終的な逆格子定数
(a*, b*, c*, alpha*, beta*, gamma*, V*) および実格子定数
(a, b, c, alpha, beta, gamma, V) を計算します。
各定数に対して誤差伝播法を用いて標準偏差も同時に計算されます。
:param lattice_system: int: 格子系の種類を示す整数コード(1-8)。
:param params: np.ndarray: 最小二乗法で得られたパラメータの配列。
:param sigma: np.ndarray: パラメータの標準偏差の配列。
:returns: CellConstants: 導出された格子定数を含むオブジェクト。
:raises LatticeCalculationError: 未対応の格子系が指定された場合。
"""
cell = CellConstants()
def reciprocal_to_direct_general(rec_lengths: np.ndarray, rec_length_sigma: np.ndarray, rec_cos: np.ndarray, rec_cos_sigma: np.ndarray) -> None:
"""
一般的な逆格子定数から実格子定数を導出し、CellConstantsオブジェクトに格納するヘルパー関数。
詳細説明:
このヘルパー関数は、逆格子の長さ、コサイン、およびそれらの標準偏差から、
実格子の長さ、コサイン、角度、体積、およびそれらの標準偏差を計算し、
`cell` オブジェクトの対応するフィールドに格納します。
誤差伝播には `SigmaPropagator` が使用されます。
:param rec_lengths: np.ndarray: 逆格子の長さ (a*, b*, c*) の配列。
:param rec_length_sigma: np.ndarray: 逆格子の長さの標準偏差の配列。
:param rec_cos: np.ndarray: 逆格子の角度のコサイン (cos(alpha*), cos(beta*), cos(gamma*)) の配列。
:param rec_cos_sigma: np.ndarray: 逆格子の角度のコサインの標準偏差の配列。
"""
cell.reciprocal_lengths[:] = rec_lengths
cell.reciprocal_length_sigma[:] = rec_length_sigma
cell.reciprocal_cosines[:] = rec_cos
cell.reciprocal_cosine_sigma[:] = rec_cos_sigma
for i in range(3):
# Calculate reciprocal angles and their sigmas
cell.reciprocal_angles_deg[i] = acos_deg(rec_cos[i])
cell.reciprocal_angle_sigma_deg[i] = SigmaPropagator([rec_cos[i]], [rec_cos_sigma[i]]).evaluate(lambda v: acos_deg(v[0]))[1]
# Propagate error for direct volume
prop = SigmaPropagator(
[*rec_lengths, *rec_cos],
[*rec_length_sigma, *rec_cos_sigma],
)
cell.direct_volume, cell.direct_volume_sigma = prop.evaluate(direct_volume_from_reciprocal)
# Calculate reciprocal volume (inverse of direct volume)
if cell.direct_volume > 0.0:
cell.reciprocal_volume = 1.0 / cell.direct_volume
# Error propagation for 1/x: d(1/x) = -1/x^2 dx => sigma(1/x) = dx/x^2
cell.reciprocal_volume_sigma = cell.direct_volume_sigma / (cell.direct_volume * cell.direct_volume)
for i in range(3):
j = (i + 1) % 3
k = (i + 2) % 3
# Calculate direct cosines and their sigmas
cell.direct_cosines[i], cell.direct_cosine_sigma[i] = SigmaPropagator(
[rec_cos[i], rec_cos[j], rec_cos[k]],
[rec_cos_sigma[i], rec_cos_sigma[j], rec_cos_sigma[k]],
).evaluate(direct_cosine_from_reciprocal)
# Calculate direct angles and their sigmas
cell.direct_angles_deg[i] = acos_deg(cell.direct_cosines[i])
cell.direct_angle_sigma_deg[i] = SigmaPropagator(
[cell.direct_cosines[i]], [cell.direct_cosine_sigma[i]]
).evaluate(lambda v: acos_deg(v[0]))[1]
# Calculate direct lengths and their sigmas
cell.direct_lengths[i], cell.direct_length_sigma[i] = SigmaPropagator(
[rec_lengths[i], rec_cos[i], rec_cos[j], rec_cos[k]],
[rec_length_sigma[i], rec_cos_sigma[i], rec_cos_sigma[j], rec_cos_sigma[k]],
).evaluate(direct_length_from_reciprocal)
if lattice_system == 1: # Triclinic
# Params: a*^2, b*^2, c*^2, 2b*c*cos(alpha*), 2c*a*cos(beta*), 2a*b*cos(gamma*)
rec_lengths = np.sqrt(np.maximum(params[:3], 0.0))
# Error propagation for sqrt(x): d(sqrt(x)) = 0.5 * dx / sqrt(x)
rec_length_sigma = np.divide(sigma[:3], 2.0 * np.maximum(rec_lengths, ANGLE_EPS), out=np.zeros(3), where=rec_lengths > ANGLE_EPS)
rec_cos = np.array([
params[3] / (rec_lengths[1] * rec_lengths[2]) if rec_lengths[1] * rec_lengths[2] > ANGLE_EPS else 0.0,
params[4] / (rec_lengths[2] * rec_lengths[0]) if rec_lengths[2] * rec_lengths[0] > ANGLE_EPS else 0.0,
params[5] / (rec_lengths[0] * rec_lengths[1]) if rec_lengths[0] * rec_lengths[1] > ANGLE_EPS else 0.0,
])
rec_cos_sigma = np.zeros(3, dtype=float)
for i in range(3):
j = (i + 1) % 3
k = (i + 2) % 3
# Error propagation for x/(y*z)
rec_cos_sigma[i] = SigmaPropagator(
[params[3 + i], rec_lengths[j], rec_lengths[k]],
[sigma[3 + i], rec_length_sigma[j], rec_length_sigma[k]],
).evaluate(lambda v: 0.0 if abs(v[1] * v[2]) <= ANGLE_EPS else v[0] / (v[1] * v[2]))[1]
reciprocal_to_direct_general(rec_lengths, rec_length_sigma, rec_cos, rec_cos_sigma)
return cell
if lattice_system == 2: # Monoclinic (unique b axis, beta*)
# Parameters: a*^2, b*^2, c*^2, 2c*a*cos(beta*)
# Missing 2b*c*cos(alpha*) (0), 2a*b*cos(gamma*) (0)
expanded_p = np.array([params[0], params[1], params[2], 0.0, params[3], 0.0], dtype=float)
expanded_s = np.array([sigma[0], sigma[1], sigma[2], 0.0, sigma[3], 0.0], dtype=float)
return derive_cell_constants(1, expanded_p, expanded_s)
if lattice_system == 3: # Monoclinic (unique c axis, gamma*) (as implemented)
# Parameters: a*^2, b*^2, c*^2, 2a*b*cos(gamma*)
# Missing 2b*c*cos(alpha*) (0), 2c*a*cos(beta*) (0)
expanded_p = np.array([params[0], params[1], params[2], 0.0, 0.0, params[3]], dtype=float)
expanded_s = np.array([sigma[0], sigma[1], sigma[2], 0.0, 0.0, sigma[3]], dtype=float)
return derive_cell_constants(1, expanded_p, expanded_s)
if lattice_system == 4: # Orthorhombic
# Parameters: a*^2, b*^2, c*^2 (all angles are 90 deg, cosines are 0)
rec_lengths = np.sqrt(np.maximum(params[:3], 0.0))
rec_length_sigma = np.divide(sigma[:3], 2.0 * np.maximum(rec_lengths, ANGLE_EPS), out=np.zeros(3), where=rec_lengths > ANGLE_EPS)
reciprocal_to_direct_general(rec_lengths, rec_length_sigma, np.zeros(3), np.zeros(3)) # Reciprocal cosines and their sigmas are 0
return cell
if lattice_system == 5: # Tetragonal
# Parameters: a*^2, c*^2 (a*=b*, all angles 90 deg)
a_star = safe_sqrt(params[0])
c_star = safe_sqrt(params[1])
da_star = 0.0 if a_star <= ANGLE_EPS else 0.5 * sigma[0] / a_star
dc_star = 0.0 if c_star <= ANGLE_EPS else 0.5 * sigma[1] / c_star
cell.reciprocal_lengths[:] = [a_star, a_star, c_star]
cell.reciprocal_length_sigma[:] = [da_star, da_star, dc_star]
# Direct lengths a = 1/a*, c = 1/c*
a = 0.0 if a_star <= ANGLE_EPS else 1.0 / a_star
c = 0.0 if c_star <= ANGLE_EPS else 1.0 / c_star
cell.direct_lengths[:] = [a, a, c]
cell.direct_length_sigma[:] = [da_star / max(a_star * a_star, ANGLE_EPS), da_star / max(a_star * a_star, ANGLE_EPS), dc_star / max(c_star * c_star, ANGLE_EPS)]
cell.direct_volume = a * a * c # For tetragonal, direct_volume = a^2 * c
cell.direct_volume_sigma = SigmaPropagator([a, a, c], [da_star / max(a_star * a_star, ANGLE_EPS), da_star / max(a_star * a_star, ANGLE_EPS), dc_star / max(c_star * c_star, ANGLE_EPS)]).evaluate(lambda v: v[0] * v[1] * v[2])[1] # Propagate error for a*a*c
if cell.direct_volume > 0.0:
cell.reciprocal_volume = 1.0 / cell.direct_volume
cell.reciprocal_volume_sigma = cell.direct_volume_sigma / (cell.direct_volume * cell.direct_volume)
return cell
if lattice_system == 6: # Cubic
# Parameters: a*^2 (a*=b*=c*, all angles 90 deg)
a_star = safe_sqrt(params[0])
da_star = 0.0 if a_star <= ANGLE_EPS else 0.5 * sigma[0] / a_star
cell.reciprocal_lengths[:] = [a_star, a_star, a_star]
cell.reciprocal_length_sigma[:] = [da_star, da_star, da_star]
# Direct length a = 1/a*
a = 0.0 if a_star <= ANGLE_EPS else 1.0 / a_star
da = da_star / max(a_star * a_star, ANGLE_EPS)
cell.direct_lengths[:] = [a, a, a]
cell.direct_length_sigma[:] = [da, da, da]
cell.direct_volume = a**3
cell.direct_volume_sigma = 3.0 * a * a * da # For a^3, sigma is 3*a^2*da
if cell.direct_volume > 0.0:
cell.reciprocal_volume = 1.0 / cell.direct_volume
cell.reciprocal_volume_sigma = cell.direct_volume_sigma / (cell.direct_volume * cell.direct_volume)
return cell
if lattice_system == 7: # Rhombohedral (trigonal)
# Parameters: (h^2+k^2+l^2) term, 2(kl+lh+hk) term
# With a*=b*=c* and alpha*=beta*=gamma*, these terms relate to a*^2 and a*^2*cos(alpha*).
# P0_design = a*^2 (h^2+k^2+l^2) -> P0_fit * (coeff_hkl_0)
# P1_design = 2a*^2 cos(alpha*) (kl+lh+hk) -> P1_fit * (coeff_hkl_1)
# Specifically, for rhombohedral system, the parameters are:
# P0 = (a*^2) and P1 = (a*^2 * cos_alpha_star) based on formula.
# However, the design matrix for rhombohedral uses (h^2+k^2+l^2) and 2*(kl+lh+hk).
# And the parameters fitted (params[0], params[1]) correspond to (a*^2) and (a*^2 * cos(alpha*)).
# (params[0] is for (h^2+k^2+l^2) which is 3*a*^2 in reciprocal space if coeffs for a*^2 and 2*a*^2*cos(a*) are implicit)
# Reinterpreting based on the design_row logic and the common formulation:
# For rhombohedral, design row is [h^2+k^2+l^2, 2(kl+lh+hk)]
# And the equation is (h*a*)^2 + (k*b*)^2 + (l*c*)^2 + 2klb*c*cos(a*) + ...
# For a*=b*=c* and alpha*=beta*=gamma*, the formula for 1/d^2 becomes:
# 1/d^2 = a*^2 * (h^2+k^2+l^2) + 2*a*^2*cos(alpha*) * (kl+lh+hk)
# So, params[0] = a*^2 and params[1] = 2*a*^2*cos(alpha*)
# From these: a* = sqrt(params[0])
# cos(alpha*) = params[1] / (2 * params[0])
a_star = safe_sqrt(params[0])
da_star = 0.0 if a_star <= ANGLE_EPS else 0.5 * sigma[0] / a_star
cos_alpha_star = 0.0 if abs(params[0]) <= ANGLE_EPS else params[1] / (2.0 * params[0])
# Error propagation for cos_alpha_star = params[1] / (2 * params[0])
dcos_alpha_star = SigmaPropagator(
[params[1], params[0]],
[sigma[1], sigma[0]]
).evaluate(lambda v: 0.0 if abs(v[1]) <= ANGLE_EPS else v[0] / (2.0 * v[1]))[1]
cell.reciprocal_lengths[:] = [a_star, a_star, a_star]
cell.reciprocal_length_sigma[:] = [da_star, da_star, da_star]
cell.reciprocal_cosines[:] = [cos_alpha_star, cos_alpha_star, cos_alpha_star]
cell.reciprocal_cosine_sigma[:] = [dcos_alpha_star, dcos_alpha_star, dcos_alpha_star]
angle_star = acos_deg(cos_alpha_star)
dangle_star = SigmaPropagator([cos_alpha_star], [dcos_alpha_star]).evaluate(lambda v: acos_deg(v[0]))[1]
cell.reciprocal_angles_deg[:] = [angle_star, angle_star, angle_star]
cell.reciprocal_angle_sigma_deg[:] = [dangle_star, dangle_star, dangle_star]
# Direct lengths, cosines, angles, and volume for trigonal
cell.direct_lengths[0], cell.direct_length_sigma[0] = SigmaPropagator([a_star, cos_alpha_star], [da_star, dcos_alpha_star]).evaluate(trigonal_direct_length)
cell.direct_lengths[:] = [cell.direct_lengths[0]] * 3
cell.direct_length_sigma[:] = [cell.direct_length_sigma[0]] * 3
cell.direct_cosines[0], cell.direct_cosine_sigma[0] = SigmaPropagator([cos_alpha_star], [dcos_alpha_star]).evaluate(trigonal_direct_cosine)
cell.direct_cosines[:] = [cell.direct_cosines[0]] * 3
cell.direct_cosine_sigma[:] = [cell.direct_cosine_sigma[0]] * 3
angle = acos_deg(cell.direct_cosines[0])
dangle = SigmaPropagator([cell.direct_cosines[0]], [cell.direct_cosine_sigma[0]]).evaluate(lambda v: acos_deg(v[0]))[1]
cell.direct_angles_deg[:] = [angle, angle, angle]
cell.direct_angle_sigma_deg[:] = [dangle, dangle, dangle]
cell.direct_volume, cell.direct_volume_sigma = SigmaPropagator([a_star, cos_alpha_star], [da_star, dcos_alpha_star]).evaluate(trigonal_direct_volume)
if cell.direct_volume > 0.0:
cell.reciprocal_volume = 1.0 / cell.direct_volume
cell.reciprocal_volume_sigma = cell.direct_volume_sigma / (cell.direct_volume * cell.direct_volume)
return cell
if lattice_system == 8: # Hexagonal
# Parameters: params[0] for (h^2+k^2+hk), params[1] for l^2
# For hexagonal, a*=b*, gamma*=60 (cos=0.5), alpha*=beta*=90 (cos=0)
# So, the design row is [a*^2, c*^2] related terms.
# params[0] = a*^2, params[1] = c*^2
a_star = safe_sqrt(params[0])
c_star = safe_sqrt(params[1])
da_star = 0.0 if a_star <= ANGLE_EPS else 0.5 * sigma[0] / a_star
dc_star = 0.0 if c_star <= ANGLE_EPS else 0.5 * sigma[1] / c_star
cell.reciprocal_lengths[:] = [a_star, a_star, c_star]
cell.reciprocal_length_sigma[:] = [da_star, da_star, dc_star]
# Direct lengths for hexagonal: a = 2/(sqrt(3)a*), c=1/c*
a = 0.0 if a_star <= ANGLE_EPS else 2.0 / (math.sqrt(3.0) * a_star)
c = 0.0 if c_star <= ANGLE_EPS else 1.0 / c_star
# Error propagation for f(x) = C/x, sigma_f = |f'(x)| * sigma_x = | -C/x^2 | * sigma_x = (C/x^2) * sigma_x
# For a = (2/sqrt(3)) / a*, sigma_a = (2/sqrt(3)) / a*^2 * sigma_a* = a / a* * sigma_a*
da = 0.0 if a_star <= ANGLE_EPS else da_star * a / a_star
dc = 0.0 if c_star <= ANGLE_EPS else dc_star * c / c_star
cell.direct_lengths[:] = [a, a, c]
cell.direct_length_sigma[:] = [da, da, dc]
# Angles for hexagonal
cell.direct_cosines[2] = -0.5 # cos(120 deg) for gamma
cell.reciprocal_cosines[2] = 0.5 # cos(60 deg) for gamma*
cell.direct_angles_deg[2] = 120.0
cell.reciprocal_angles_deg[2] = 60.0
# Direct volume for hexagonal V = sqrt(3)/2 * a^2 * c
cell.direct_volume = math.sqrt(3.0) * 0.5 * a * a * c
cell.direct_volume_sigma = SigmaPropagator([a, a, c], [da, da, dc]).evaluate(lambda v: math.sqrt(3.0) * 0.5 * v[0] * v[1] * v[2])[1]
if cell.direct_volume > 0.0:
cell.reciprocal_volume = 1.0 / cell.direct_volume
cell.reciprocal_volume_sigma = cell.direct_volume_sigma / (cell.direct_volume * cell.direct_volume)
return cell
raise LatticeCalculationError(f"unsupported lattice system: {lattice_system}")
# ---------- reporting ----------
[ドキュメント]
class TeeStream:
"""
ファイルとコンソールへ同時に書き込む簡易ストリーム。
詳細説明:
このクラスは、複数の `TextIO` ストリーム(例: ファイルオブジェクトと `sys.stdout`)をラップし、
`write` メソッドが呼び出された際に、登録された全てのストリームにテキストを転送します。
これにより、計算結果をファイルに保存しつつ、同時にコンソールにも表示することができます。
:param streams: TextIO: 書き込み先の `TextIO` ストリームを可変長引数で指定。
"""
def __init__(self, *streams: TextIO):
"""
TeeStreamの新しいインスタンスを初期化する。
:param streams: TextIO: 書き込み先の `TextIO` ストリームを可変長引数で指定。
"""
self.streams = streams
[ドキュメント]
def write(self, text: str) -> int:
"""
全ての登録ストリームにテキストを書き込む。
詳細説明:
`write` メソッドが呼び出されると、登録された全ての `TextIO` ストリームに対して
引数 `text` を書き込みます。また、各ストリームの `flush` メソッドも呼び出されます。
:param text: str: 書き込むテキスト。
:returns: int: 書き込まれた文字数 (入力 `text` の長さ)。
"""
for stream in self.streams:
stream.write(text)
stream.flush()
return len(text)
[ドキュメント]
def flush(self) -> None:
"""
全ての登録ストリームをフラッシュする。
詳細説明:
登録された全ての `TextIO` ストリームの `flush` メソッドを呼び出し、
バッファリングされたデータを強制的に出力します。
"""
for stream in self.streams:
stream.flush()
[ドキュメント]
def make_default_output_path(input_path: Path) -> Path:
"""
出力ファイルが未指定の場合、入力ファイルの拡張子を `.out` に置き換えたパスを生成する。
:param input_path: Path: 入力ファイルのパス。
:returns: Path: 生成された出力ファイルのパス。
"""
return input_path.with_suffix('.out')
[ドキュメント]
def lattice_system_name(code: int) -> str:
"""
格子系のコードに対応する名前を日本語と英語で返す。
:param code: int: 格子系の整数コード。
:returns: str: 格子系の名前(例: "Triclinic / 三斜晶")。
"""
names = {
1: "Triclinic / 三斜晶",
2: "Monoclinic(unique b) / 単斜晶(b軸特異)",
3: "Monoclinic(unique c) / 単斜晶(c軸特異)",
4: "Orthorhombic / 斜方晶",
5: "Tetragonal / 正方晶",
6: "Cubic / 立方晶",
7: "Rhombohedral(Trigonal) / 菱面体晶(三方晶)",
8: "Hexagonal / 六方晶",
}
return names.get(code, f"Unknown({code})")
[ドキュメント]
def lattice_parameter_labels(lattice_system: int) -> list[str]:
"""
指定された格子系の最小二乗フィットパラメータに対応するラベルのリストを返す。
詳細説明:
各格子系における結晶学的な制約に基づいて、フィットされたパラメータが何を表すかを
示す適切なラベル(例: "a*^2", "2 c* a* cos(beta*)")を提供します。
:param lattice_system: int: 格子系の種類を示す整数コード。
:returns: list[str]: パラメータラベルのリスト。
"""
if lattice_system == 1:
return [
"a*^2",
"b*^2",
"c*^2",
"2 b* c* cos(alpha*)",
"2 c* a* cos(beta*)",
"2 a* b* cos(gamma*)",
]
if lattice_system == 2:
return [
"a*^2",
"b*^2",
"c*^2",
"2 c* a* cos(beta*)",
]
if lattice_system == 3:
return [
"a*^2",
"b*^2",
"c*^2",
"2 a* b* cos(gamma*)",
]
if lattice_system == 4:
return ["a*^2", "b*^2", "c*^2"]
if lattice_system == 5:
return ["a*^2 (= b*^2)", "c*^2"]
if lattice_system == 6:
return ["a*^2 (= b*^2 = c*^2)"]
if lattice_system == 7:
return [
"a*^2", # corresponds to a*^2 in 1/d^2 = a*^2*(h^2+k^2+l^2) + ...
"2*a*^2*cos(alpha*)", # corresponds to 2*a*^2*cos(alpha*)*(kl+lh+hk) in 1/d^2 = ...
]
if lattice_system == 8:
return ["a*^2", "c*^2"] # For hexagonal, this implies a*=b*, and 1/d^2 = a*^2*(h^2+k^2+hk) + c*^2*l^2
return [f"lattice_param_{i+1}" for i in range(6)]
[ドキュメント]
def write_section_title(output: TextIO, title_en: str, title_ja: str) -> None:
"""
レポートのセクションタイトルを英語と日本語で出力する。
:param output: TextIO: 書き込み先のストリーム。
:param title_en: str: 英語のタイトル。
:param title_ja: str: 日本語のタイトル。
"""
output.write(f"\n[{title_en} / {title_ja}]\n")
[ドキュメント]
def parameter_blocks(job: JobData, fit: LeastSquaresResult) -> tuple[list[tuple[str, float, float]], list[tuple[str, float, float]]]:
"""
フィットされたパラメータを格子定数関連と補正項関連の2つのブロックに分割して返す。
詳細説明:
最小二乗法でフィットされた全パラメータを、格子系に特有のパラメータと、
有効化された補正項の係数とに分類し、それぞれのラベル、値、標準偏差の
タプルリストとして返します。
:param job: JobData: 現在の計算ジョブのデータ。
:param fit: LeastSquaresResult: 最小二乗法のフィット結果。
:returns: tuple[list[tuple[str, float, float]], list[tuple[str, float, float]]]:
格子パラメータのリストと補正項係数のリストのタプル。
"""
lattice_labels = lattice_parameter_labels(job.config.lattice_system)
n_lattice = len(lattice_labels)
lattice_part = [
(label, float(value), float(sigma))
for label, value, sigma in zip(
lattice_labels,
fit.parameters[:n_lattice],
fit.parameter_sigma[:n_lattice],
)
]
corr_meta = correction_metadata()
corr_part: list[tuple[str, float, float]] = []
corr_index = n_lattice
for flag, meta in zip(job.config.correction_flags, corr_meta):
if flag:
corr_part.append(
(
meta["name"],
float(fit.parameters[corr_index]),
float(fit.parameter_sigma[corr_index]),
)
)
corr_index += 1
return lattice_part, corr_part
[ドキュメント]
def calc_fit_statistics(fit: LeastSquaresResult) -> tuple[float, float, int]:
"""
フィットの統計情報(重み付き残差二乗和、RMS残差、自由度)を計算する。
:param fit: LeastSquaresResult: 最小二乗法のフィット結果。
:returns: tuple[float, float, int]: 重み付き残差二乗和、RMS残差、自由度のタプル。
"""
if len(fit.kept_observations) == 0:
return 0.0, 0.0, 0
w = np.array([obs.weight for obs in fit.kept_observations], dtype=float)
y = np.array([obs.transformed_position for obs in fit.kept_observations], dtype=float)
resid = fit.residuals
n_params = len(fit.parameters)
dof = max(len(y) - n_params, 0)
weighted_ss = float(np.sum(w * resid * resid))
rms = math.sqrt(float(np.mean(resid * resid))) if len(resid) else 0.0
return weighted_ss, rms, dof
[ドキュメント]
def mode_description_position(mode: int) -> str:
"""
位置変換モードのコードに対応する説明を返す。
:param mode: int: 位置変換モードの整数コード。
:returns: str: モードの説明文字列。
"""
mapping = {
1: "sin(theta) from angle in degree / 角度(deg)から sin(theta) へ変換",
2: "raw position / 生の位置値をそのまま使用",
3: "zero-shift corrected sin(theta) / ゼロシフト補正付き sin(theta)",
4: "sin(theta/2) / sin(theta/2)",
5: "lambda/(2*d)-like transform / lambda/(2*d) 型変換",
}
return mapping.get(mode, "geometry-dependent transform / 幾何条件依存の変換")
[ドキュメント]
def mode_description_weight(mode: int) -> str:
"""
重み付けモードのコードに対応する説明を返す。
:param mode: int: 重み付けモードの整数コード。
:returns: str: モードの説明文字列。
"""
mapping = {
0: "sigma(theta)-based weight / sigma(theta) ベース重み",
1: "sigma(2theta)-based weight / sigma(2theta) ベース重み",
2: "constant weight / 一定重み",
3: "user-supplied weight / ユーザー指定重み",
4: "1/sigma^2 / 1/sigma^2",
}
return mapping.get(mode, "unknown / 不明")
[ドキュメント]
def write_job_report(output: TextIO, job: JobData, fit: LeastSquaresResult, cell: CellConstants) -> None:
"""
単一の格子定数計算ジョブの結果を整形されたレポートとして出力する。
詳細説明:
この関数は、ジョブの概要、実格子定数、逆格子定数、フィットされたパラメータ、
適用された補正モデル、そして各反射の詳細なテーブル(使用されたものと除外されたもの)
を含む包括的なレポートを `output` ストリームに書き出します。
:param output: TextIO: レポートを書き出すためのテキストストリーム。
:param job: JobData: 計算されたジョブの設定と観測データ。
:param fit: LeastSquaresResult: 最小二乗法のフィット結果。
:param cell: CellConstants: 導出された格子定数。
"""
cfg = job.config
lattice_part, corr_part = parameter_blocks(job, fit)
weighted_ss, rms_resid, dof = calc_fit_statistics(fit)
output.write("\n" + "=" * 88 + "\n")
output.write(f"Job: {job.title.strip() or '(untitled job)'}\n")
output.write("=" * 88 + "\n")
write_section_title(output, "Job summary", "ジョブ概要")
output.write(f"Lattice system / 格子系 : {lattice_system_name(cfg.lattice_system)}\n")
output.write(f"Reference wavelength / 参照波長 : {job.reference_wavelength:.7f}\n")
output.write(f"Weight mode / 重みモード : {cfg.weight_mode} ({mode_description_weight(cfg.weight_mode)})\n")
output.write(f"Position mode / 位置変換モード : {cfg.position_mode} ({mode_description_position(cfg.position_mode)})\n")
output.write(f"Profile mode / プロファイルモード : {cfg.profile_mode}\n")
output.write(f"Input observations / 入力反射数 : {len(job.observations)}\n")
output.write(f"Used observations / 採用反射数 : {len(fit.kept_observations)}\n")
output.write(f"Rejected observations / 除外反射数 : {len(fit.rejected_observations)}\n")
output.write(f"Degrees of freedom / 自由度 : {dof}\n")
output.write(f"Weighted SS / 重み付き残差二乗和 : {weighted_ss:.7e}\n")
output.write(f"RMS residual in y / y空間RMS残差 : {rms_resid:.7e}\n")
write_section_title(output, "Direct lattice constants", "実格子定数")
output.write(f"a = {format_value_sigma(cell.direct_lengths[0], cell.direct_length_sigma[0])}\n")
output.write(f"b = {format_value_sigma(cell.direct_lengths[1], cell.direct_length_sigma[1])}\n")
output.write(f"c = {format_value_sigma(cell.direct_lengths[2], cell.direct_length_sigma[2])}\n")
output.write(f"alpha = {format_value_sigma(cell.direct_angles_deg[0], cell.direct_angle_sigma_deg[0])} deg\n")
output.write(f"beta = {format_value_sigma(cell.direct_angles_deg[1], cell.direct_angle_sigma_deg[1])} deg\n")
output.write(f"gamma = {format_value_sigma(cell.direct_angles_deg[2], cell.direct_angle_sigma_deg[2])} deg\n")
output.write(f"cos(a) = {format_value_sigma(cell.direct_cosines[0], cell.direct_cosine_sigma[0])}\n")
output.write(f"cos(b) = {format_value_sigma(cell.direct_cosines[1], cell.direct_cosine_sigma[1])}\n")
output.write(f"cos(c) = {format_value_sigma(cell.direct_cosines[2], cell.direct_cosine_sigma[2])}\n")
output.write(f"V = {format_value_sigma(cell.direct_volume, cell.direct_volume_sigma)}\n")
write_section_title(output, "Reciprocal lattice constants", "逆格子定数")
output.write(f"a* = {format_value_sigma(cell.reciprocal_lengths[0], cell.reciprocal_length_sigma[0])}\n")
output.write(f"b* = {format_value_sigma(cell.reciprocal_lengths[1], cell.reciprocal_length_sigma[1])}\n")
output.write(f"c* = {format_value_sigma(cell.reciprocal_lengths[2], cell.reciprocal_length_sigma[2])}\n")
output.write(f"alpha* = {format_value_sigma(cell.reciprocal_angles_deg[0], cell.reciprocal_angle_sigma_deg[0])} deg\n")
output.write(f"beta* = {format_value_sigma(cell.reciprocal_angles_deg[1], cell.reciprocal_angle_sigma_deg[1])} deg\n")
output.write(f"gamma* = {format_value_sigma(cell.reciprocal_angles_deg[2], cell.reciprocal_angle_sigma_deg[2])} deg\n")
output.write(f"cos(a*) = {format_value_sigma(cell.reciprocal_cosines[0], cell.reciprocal_cosine_sigma[0])}\n")
output.write(f"cos(b*) = {format_value_sigma(cell.reciprocal_cosines[1], cell.reciprocal_cosine_sigma[1])}\n")
output.write(f"cos(c*) = {format_value_sigma(cell.reciprocal_cosines[2], cell.reciprocal_cosine_sigma[2])}\n")
output.write(f"V* = {format_value_sigma(cell.reciprocal_volume, cell.reciprocal_volume_sigma)}\n")
write_section_title(output, "Lattice fit parameters", "格子フィットパラメータ")
for name, value, sigma in lattice_part:
output.write(f"{name:<36s} = {format_value_sigma(value, sigma)}\n")
write_section_title(output, "Correction model", "補正モデル")
output.write("s = transformed_position, theta = diffraction angle in radian / θはラジアンでの回折角\n")
output.write("The following physical meanings are tentative interpretations inferred from the legacy formulas and comments.\n")
output.write("以下の物理的意味は旧コードの式とコメントから推定した暫定解釈です。\n")
output.write("The user is responsible for confirming each interpretation for the actual experimental setup.\n")
output.write("各補正項の解釈が実験条件に対して妥当かどうかの最終確認は、ユーザーの責任で行ってください。\n")
active_corr_names = {name for name, _, _ in corr_part}
fitted_corr_map = {name: (value, sigma) for name, value, sigma in corr_part}
for flag, meta in zip(cfg.correction_flags, correction_metadata()):
state = "ON" if flag else "OFF"
output.write(f"\n{meta['name']} : {state}\n")
output.write(f" formula / 式 : {meta['formula']}\n")
output.write(f" {meta['en']}\n")
output.write(f" {meta['ja']}\n")
output.write(" This interpretation is inferred from the legacy formula and comments; the user is responsible for confirming it.\n")
output.write(" この解釈は旧コードの式とコメントからの推定であり、最終確認はユーザーの責任で行ってください。\n")
if meta["name"] in active_corr_names:
value, sigma = fitted_corr_map[meta["name"]]
output.write(f" fitted coefficient / フィット係数 : {format_value_sigma(value, sigma)}\n")
if corr_part:
write_section_title(output, "Fitted correction coefficients", "フィットされた補正係数")
for name, value, sigma in corr_part:
output.write(f"{name:<8s} = {format_value_sigma(value, sigma)}\n")
else:
write_section_title(output, "Fitted correction coefficients", "フィットされた補正係数")
output.write("No correction term is enabled. / 補正項は有効化されていません。\n")
write_section_title(output, "Reflection table", "反射一覧")
output.write(
f"{'No.':>4s} {'h':>4s} {'k':>4s} {'l':>4s} "
f"{'d_obs':>12s} {'d_calc':>12s} {'raw_pos':>12s} {'2theta_calc':>12s} "
f"{'residual_y':>14s} {'delta_2theta':>14s} {'weight':>12s}\n"
)
for obs, fitted, resid, sigma_obs in zip(
fit.kept_observations,
fit.fitted_y,
fit.residuals,
fit.observation_sigma,
):
d_obs = 1.0 / math.sqrt(max(obs.transformed_position, ANGLE_EPS))
d_calc = 1.0 / math.sqrt(max(fitted, ANGLE_EPS))
theta_calc = clamp_unit(obs.wavelength / (2.0 * d_calc)) if d_calc > 0.0 else 1.0
two_theta_calc = 2.0 * math.degrees(math.asin(theta_calc))
delta_2theta = obs.raw_position - two_theta_calc
output.write(
f"{obs.serial_index:4d} {obs.h:4d} {obs.k:4d} {obs.l:4d} "
f"{d_obs:12.6f} {d_calc:12.6f} {obs.raw_position:12.5f} {two_theta_calc:12.5f} "
f"{resid:14.6e} {delta_2theta:14.6e} {obs.weight:12.5e}\n"
)
write_section_title(output, "Rejected reflections", "除外された反射")
if fit.rejected_observations:
output.write(f"Outlier criterion / 外れ値判定基準 : |residual| > {OUTLIER_SIGMA:.1f} * sigma_obs\n")
output.write(f"{'No.':>4s} {'h':>4s} {'k':>4s} {'l':>4s} {'raw_pos':>12s} {'note':>18s}\n")
for obs in fit.rejected_observations:
output.write(
f"{obs.serial_index:4d} {obs.h:4d} {obs.k:4d} {obs.l:4d} "
f"{obs.raw_position:12.5f} {'rejected as outlier':>18s}\n"
)
else:
output.write("No reflection was rejected. / 除外された反射はありません。\n")
[ドキュメント]
def sanitize_stem(text: str) -> str:
"""
ファイル名として安全な文字列を生成するため、テキストから無効な文字を除去する。
詳細説明:
入力文字列から英数字、ハイフン、アンダースコア以外の文字をアンダースコアに置換し、
前後のアンダースコアを削除して返します。空文字列になった場合は 'job' を返します。
これにより、プラットフォーム間で互換性のあるファイル名を生成できます。
:param text: str: サニタイズする元の文字列。
:returns: str: サニタイズされた文字列。
"""
return ''.join(ch if ch.isalnum() or ch in ('-', '_') else '_' for ch in text).strip('_') or 'job'
[ドキュメント]
def plot_job_result(
job: JobData,
fit: LeastSquaresResult,
output_file: Path,
job_index: int,
*,
show_plot: bool = True,
) -> Path:
"""
各ジョブの Q-1/d プロットを生成し、画像ファイルとして保存する。
詳細説明:
観測された1/dと計算された1/dを散乱ベクトルQに対してプロットし、
誤差棒も表示します。右軸には1/dの標準偏差も表示されます。
生成されたプロットはPNG画像として保存され、オプションで表示も可能です。
:param job: JobData: プロット対象のジョブデータ。
:param fit: LeastSquaresResult: プロットに使用するフィット結果。
:param output_file: Path: レポート出力ファイル。プロットのファイル名生成に使用される。
:param job_index: int: ジョブのインデックス(ファイル名に使用)。
:param show_plot: bool: Trueの場合、プロットウィンドウを表示する。デフォルトはTrue。
:returns: Path: 保存されたプロットファイルのパス。
"""
one_over_d_calc = np.sqrt(np.maximum(fit.fitted_y, 0.0))
one_over_d_obs = np.sqrt(np.maximum([obs.transformed_position for obs in fit.kept_observations], 0.0))
sigma_one_over_d = np.zeros_like(one_over_d_calc)
valid = one_over_d_calc > ANGLE_EPS
sigma_one_over_d[valid] = fit.observation_sigma[valid] / (2.0 * one_over_d_calc[valid])
q_obs = 2.0 * math.pi * one_over_d_obs
q_min = 0.0
q_max = max(float(np.max(q_obs)) if q_obs.size else 1.0, 1.0) * 1.05
q_line = np.linspace(q_min, q_max, 400)
one_over_d_line = q_line / (2.0 * math.pi)
fig, ax_left = plt.subplots(figsize=(8, 5))
ax_left.plot(q_line, one_over_d_line, label='theory: 1/d = Q / 2π')
ax_left.errorbar(q_obs, one_over_d_calc, yerr=sigma_one_over_d, fmt='o', capsize=3, label='calculated 1/d ±σ')
ax_left.plot(q_obs, one_over_d_obs, 's', label='observed 1/d')
ax_left.set_xlabel('Scattering vector Q')
ax_left.set_ylabel('1/d')
ax_left.grid(True, alpha=0.3)
ax_right = ax_left.twinx()
ax_right.plot(q_obs, sigma_one_over_d, '^--', label='σ(1/d)')
ax_right.set_ylabel('σ(1/d)')
title = job.title.strip() or f'Job {job_index}'
ax_left.set_title(title)
handles_left, labels_left = ax_left.get_legend_handles_labels()
handles_right, labels_right = ax_right.get_legend_handles_labels()
ax_left.legend(handles_left + handles_right, labels_left + labels_right, loc='best')
fig.tight_layout()
plot_path = output_file.with_name(f"{output_file.stem}_job{job_index:02d}_{sanitize_stem(title)[:40]}.png")
fig.savefig(plot_path, dpi=150)
if show_plot:
plt.show()
plt.close(fig)
return plot_path
# ---------- CLI ----------
[ドキュメント]
def build_argument_parser() -> argparse.ArgumentParser:
"""
コマンドライン引数を解析するためのパーサーを構築する。
:returns: argparse.ArgumentParser: 構築されたArgumentParserオブジェクト。
"""
parser = argparse.ArgumentParser(description='Least-square calculation of lattice constants from reflection data')
parser.add_argument('input_file', type=Path, help='input file')
parser.add_argument('output_file', type=Path, nargs='?', help='output report file (.out by default)')
parser.add_argument('--silent', action='store_true', help='suppress banner output')
parser.add_argument('--no-show', action='store_true', help='save plots but do not open plot windows')
return parser
[ドキュメント]
def main() -> int:
"""
スクリプトのエントリポイント。
詳細説明:
コマンドライン引数を解析し、入力ファイルから格子定数計算ジョブを読み込みます。
各ジョブに対して重み付き最小二乗法を実行し、格子定数を導出します。
計算結果はレポートファイルとコンソールに表示され、各ジョブのプロットも生成・保存されます。
:returns: int: プログラムの終了コード。成功時は0。
"""
parser = build_argument_parser()
args = parser.parse_args()
output_file = args.output_file if args.output_file is not None else make_default_output_path(args.input_file)
if not args.silent:
print('Least Square Calculation for Lattice Constant')
print('Refactored Python version')
print(f'Input : {args.input_file}')
print(f'Output: {output_file}')
with args.input_file.open('r', encoding='utf-8') as fin:
jobs = parse_jobs(fin)
plot_paths: list[Path] = []
with output_file.open('w', encoding='utf-8') as fout:
tee = TeeStream(fout, sys.stdout)
tee.write('Least Square Calculation for Lattice Constant\n')
tee.write('Refactored Python version\n\n')
for idx, job in enumerate(jobs, start=1):
fit = weighted_least_squares(job.observations, tee)
cell = derive_cell_constants(job.config.lattice_system, fit.parameters, fit.parameter_sigma)
write_job_report(tee, job, fit, cell)
plot_paths.append(plot_job_result(job, fit, output_file, idx, show_plot=not args.no_show))
tee.write('\nLattice Constant Calculation was finished!\n')
if not args.silent:
print('Saved plot files:')
for path in plot_paths:
print(f' {path}')
return 0
if __name__ == '__main__':
raise SystemExit(main())