"""tkparamio.py
フィッティングパラメータ CSV の読み書きと補助関数を提供するモジュール。
このモジュールは、フィッティングパラメータをCSVファイル形式で管理するための機能を提供します。
パラメータの読み書き、デフォルト値からのCSV生成、値の更新、最適化対象のパラメータ抽出、
境界値の取得、境界ペナルティの計算、表示用文字列への整形、および範囲外警告の生成など、
多岐にわたる補助関数が含まれています。
CSV columns:
varname,optid,optid_lin,p0,pmin,pmax,kpenalty
optional columns:
stderr, note, unit など
方針:
- optid=1 : 非線形最適化対象
- optid_lin=1 : 線形最適化対象
- pmin/pmax が有限値なら、kpenalty * deviation^2 で範囲ペナルティを加える
:doc:`tkparamio_usage`
"""
from __future__ import annotations
from pathlib import Path
from typing import Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union
import csv
import math
import numpy as np
ParamRow = Dict[str, object]
ParamTable = Dict[str, ParamRow]
REQUIRED_COLUMNS = ["varname", "optid", "optid_lin", "p0", "pmin", "pmax", "kpenalty"]
DEFAULT_COLUMNS = REQUIRED_COLUMNS + ["stderr"]
[ドキュメント]
def parse_float(value, default: float = np.nan) -> float:
"""値を浮動小数点数に変換し、空欄やNoneの場合はデフォルト値を適用します。
文字列の空白をトリムし、空の場合は `default` を返します。
:param value: 変換対象の値。
:param default: 変換に失敗した場合や空欄の場合に返すデフォルト値。デフォルトは `np.nan`。
:returns: 変換された浮動小数点数。
"""
if value is None:
return default
s = str(value).strip()
if s == "":
return default
return float(s)
[ドキュメント]
def parse_int(value, default: int = 0) -> int:
"""値を整数に変換し、空欄やNoneの場合はデフォルト値を適用します。
文字列の空白をトリムし、空の場合は `default` を返します。
浮動小数点数として解釈可能な文字列も整数に変換します。
:param value: 変換対象の値。
:param default: 変換に失敗した場合や空欄の場合に返すデフォルト値。デフォルトは0。
:returns: 変換された整数。
"""
if value is None:
return default
s = str(value).strip()
if s == "":
return default
return int(float(s))
[ドキュメント]
def normalize_param_row(row: Mapping[str, object]) -> ParamRow:
"""パラメータの行データを標準形式に正規化します。
`varname` の存在を確認し、必須カラム (`optid`, `optid_lin`, `p0`, `pmin`, `pmax`,
`kpenalty`) の値を適切な型に変換します。`stderr` が存在する場合はこれも変換します。
余分な列は保持されます。
:param row: 変換するパラメータの行データ(辞書形式)。
:returns: 正規化されたパラメータ行データ。
:raises ValueError: `varname` が存在しない、または空の場合に発生します。
"""
if "varname" not in row:
raise ValueError("parameter row must contain 'varname'")
name = str(row["varname"]).strip()
if not name:
raise ValueError("empty parameter name")
out: ParamRow = dict(row)
out["varname"] = name
out["optid"] = parse_int(row.get("optid", 0), 0)
out["optid_lin"] = parse_int(row.get("optid_lin", 0), 0)
out["p0"] = parse_float(row.get("p0", 0.0), 0.0)
out["pmin"] = parse_float(row.get("pmin", np.nan), np.nan)
out["pmax"] = parse_float(row.get("pmax", np.nan), np.nan)
out["kpenalty"] = parse_float(row.get("kpenalty", 1.0e6), 1.0e6)
if "stderr" in out:
out["stderr"] = parse_float(out.get("stderr"), np.nan)
return out
[ドキュメント]
def rows_from_defaults(defaults: Sequence[Mapping[str, object]]) -> ParamTable:
"""デフォルトパラメータ定義のリストから ParamTable を作成します。
与えられた各行を `normalize_param_row` で正規化し、`varname` をキーとして
`ParamTable` に格納します。
:param defaults: デフォルトパラメータの行データのシーケンス。
:returns: パラメータ名がキーの `ParamTable`。
"""
params: ParamTable = {}
for row in defaults:
r = normalize_param_row(row)
params[str(r["varname"])] = r
return params
[ドキュメント]
def create_param_csv(
path: Union[str, Path],
defaults: Sequence[Mapping[str, object]],
*,
overwrite: bool = False,
columns: Optional[Sequence[str]] = None,
) -> None:
"""デフォルト値を使用してパラメータCSVファイルを作成します。
指定されたパスにCSVファイルを作成します。ファイルが既に存在し `overwrite` が
`False` の場合、ファイルは作成されません。`columns` が指定されない場合、
デフォルトカラムと既存の追加カラムが自動的に含まれます。
:param path: 出力するCSVファイルのパス。
:param defaults: CSVに書き込むデフォルトパラメータのリスト。
:param overwrite: ファイルが既に存在する場合に上書きするかどうか。デフォルトは `False`。
:param columns: CSVのヘッダーとして使用するカラム名のシーケンス。指定しない場合は自動決定されます。
:returns: None
"""
path = Path(path)
if path.exists() and not overwrite:
return
params = rows_from_defaults(defaults)
path.parent.mkdir(parents=True, exist_ok=True)
if columns is None:
extra = []
for row in params.values():
for k in row.keys():
if k not in DEFAULT_COLUMNS and k not in extra:
extra.append(k)
columns = DEFAULT_COLUMNS + extra
with path.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=list(columns))
writer.writeheader()
for row in params.values():
writer.writerow(_row_for_csv(row, columns))
[ドキュメント]
def read_param_csv(
path: Union[str, Path],
*,
defaults: Optional[Sequence[Mapping[str, object]]] = None,
create_if_missing: bool = True,
) -> ParamTable:
"""パラメータCSVファイルを読み込み、`ParamTable` を返します。
CSVファイルが存在しない場合、`defaults` が提供されており `create_if_missing` が
`True` であれば、`create_param_csv` を使用してファイルを自動生成してから読み込みます。
必須カラムの存在が検証されます。
:param path: 読み込むCSVファイルのパス。
:param defaults: CSVファイルが存在しない場合に、ファイルの自動生成に使用するデフォルトパラメータのリスト。
:param create_if_missing: CSVファイルが存在しない場合に自動生成するかどうか。デフォルトは `True`。
:returns: 読み込まれたパラメータデータを含む `ParamTable`。
:raises FileNotFoundError: ファイルが存在せず、自動生成も行われない場合に発生します。
:raises ValueError: CSVファイルが空であるか、必須カラムが不足している場合に発生します。
"""
path = Path(path)
if not path.exists():
if defaults is None or not create_if_missing:
raise FileNotFoundError(path)
create_param_csv(path, defaults, overwrite=False)
with path.open("r", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
if reader.fieldnames is None:
raise ValueError(f"empty parameter CSV: {path}")
missing = set(REQUIRED_COLUMNS) - set(reader.fieldnames)
if missing:
raise ValueError(f"parameter CSV missing columns: {sorted(missing)}")
params: ParamTable = {}
for row in reader:
r = normalize_param_row(row)
name = str(r["varname"])
params[name] = r
return params
[ドキュメント]
def write_param_csv(
path: Union[str, Path],
params: Mapping[str, Mapping[str, object]],
*,
values: Optional[Mapping[str, float]] = None,
stderr: Optional[Mapping[str, Optional[float]]] = None,
columns: Optional[Sequence[str]] = None,
) -> None:
"""パラメータデータをCSVファイルに保存します。
指定されたパスにCSVファイルを書き込みます。`values` が提供された場合、
対応するパラメータの `p0` 値が更新されます。`stderr` が提供された場合、
対応する `stderr` 列が更新されます。`columns` が指定されない場合、
デフォルトカラムと既存の追加カラムが自動的に含まれます。
:param path: 出力するCSVファイルのパス。
:param params: 保存するパラメータデータを含む `ParamTable`。
:param values: 更新するパラメータ名とその新しい値のマッピング。対応する `p0` が上書きされます。
:param stderr: オプションで更新するパラメータ名とその新しい標準誤差のマッピング。
対応する `stderr` が上書きされます。`None` を指定すると `np.nan` として扱われます。
:param columns: CSVのヘッダーとして使用するカラム名のシーケンス。指定しない場合は自動決定されます。
:returns: None
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
normalized = {name: normalize_param_row(row) for name, row in params.items()}
if values is not None:
for name, v in values.items():
if name in normalized:
normalized[name]["p0"] = float(v)
if stderr is not None:
for name, se in stderr.items():
if name in normalized:
normalized[name]["stderr"] = np.nan if se is None else float(se)
if columns is None:
extra = []
for row in normalized.values():
for k in row.keys():
if k not in DEFAULT_COLUMNS and k not in extra:
extra.append(k)
columns = DEFAULT_COLUMNS + extra
with path.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=list(columns))
writer.writeheader()
for row in normalized.values():
writer.writerow(_row_for_csv(row, columns))
def _row_for_csv(row: Mapping[str, object], columns: Sequence[str]) -> Dict[str, object]:
"""NaN を空欄として CSV 出力用にパラメータ行データを整形します。
`numpy.nan` (非有限浮動小数点数) の値を空文字列に変換し、指定されたカラムのみを
含む辞書を生成します。これは内部補助関数です。
:param row: 整形するパラメータ行データ。
:param columns: 出力に含めるカラム名のシーケンス。
:returns: CSV出力用に整形された辞書。
"""
out = {}
for k in columns:
v = row.get(k, "")
if isinstance(v, float) and not np.isfinite(v):
out[k] = ""
else:
out[k] = v
return out
[ドキュメント]
def values_from_params(params: Mapping[str, Mapping[str, object]]) -> Dict[str, float]:
"""`ParamTable` からパラメータ名とその初期値 (`p0`) の辞書を作成します。
各パラメータ行の `p0` 値を浮動小数点数として抽出します。
:param params: `ParamTable` 形式のパラメータデータ。
:returns: パラメータ名がキー、`p0` 値が値の辞書。
"""
return {name: float(row["p0"]) for name, row in params.items()}
[ドキュメント]
def update_param_values(
params: ParamTable,
values: Mapping[str, float],
*,
inplace: bool = False,
) -> ParamTable:
"""`ParamTable` 内のパラメータの初期値 (`p0`) を更新します。
`values` で指定されたパラメータの `p0` 値を更新します。`inplace` が `True` の場合、
元の `ParamTable` が変更されます。`False` の場合、新しい `ParamTable` が作成されて返されます。
:param params: 更新対象の `ParamTable`。
:param values: 更新するパラメータ名と新しい値のマッピング。
:param inplace: 更新をインプレースで行うかどうか。デフォルトは `False`。
:returns: 更新された `ParamTable`。
"""
out = params if inplace else {k: dict(v) for k, v in params.items()}
for name, value in values.items():
if name in out:
out[name]["p0"] = float(value)
return out
[ドキュメント]
def nonlinear_param_names(params: Mapping[str, Mapping[str, object]]) -> List[str]:
"""非線形最適化の対象となるパラメータ名(`optid=1`)のリストを返します。
`ParamTable` 内の各パラメータ行をチェックし、`optid` が `1` に設定されている
パラメータ名のみを抽出します。
:param params: `ParamTable` 形式のパラメータデータ。
:returns: 非線形最適化対象のパラメータ名のリスト。
"""
return [name for name, row in params.items() if int(row.get("optid", 0)) == 1]
[ドキュメント]
def linear_param_names(params: Mapping[str, Mapping[str, object]]) -> List[str]:
"""線形最適化の対象となるパラメータ名(`optid_lin=1`)のリストを返します。
`ParamTable` 内の各パラメータ行をチェックし、`optid_lin` が `1` に設定されている
パラメータ名のみを抽出します。
:param params: `ParamTable` 形式のパラメータデータ。
:returns: 線形最適化対象のパラメータ名のリスト。
"""
return [name for name, row in params.items() if int(row.get("optid_lin", 0)) == 1]
[ドキュメント]
def free_param_names(params: Mapping[str, Mapping[str, object]]) -> List[str]:
"""非線形または線形最適化の対象となるすべてのパラメータ名のリストを返します。
`ParamTable` 内の各パラメータ行をチェックし、`optid` または `optid_lin` の
いずれかが `1` に設定されているパラメータ名のみを抽出します。
:param params: `ParamTable` 形式のパラメータデータ。
:returns: 非線形または線形最適化対象のパラメータ名のリスト。
"""
names = []
for name, row in params.items():
if int(row.get("optid", 0)) == 1 or int(row.get("optid_lin", 0)) == 1:
names.append(name)
return names
[ドキュメント]
def fixed_param_names(params: Mapping[str, Mapping[str, object]]) -> List[str]:
"""最適化対象外となる固定パラメータ名(`optid=0` かつ `optid_lin=0`)のリストを返します。
`ParamTable` 内の各パラメータ行をチェックし、`optid` と `optid_lin` の両方が
`0` に設定されているパラメータ名のみを抽出します。
:param params: `ParamTable` 形式のパラメータデータ。
:returns: 固定パラメータ名のリスト。
"""
names = []
for name, row in params.items():
if int(row.get("optid", 0)) == 0 and int(row.get("optid_lin", 0)) == 0:
names.append(name)
return names
[ドキュメント]
def bounds_from_params(
params: Mapping[str, Mapping[str, object]],
names: Optional[Sequence[str]] = None,
) -> Dict[str, Tuple[float, float]]:
"""パラメータの最小値 (`pmin`) と最大値 (`pmax`) から境界値の辞書を作成します。
各パラメータの `pmin` と `pmax` を抽出し、`(-inf, inf)` をデフォルトとして
境界タプル `(lo, hi)` を作成します。`names` が指定された場合、そのパラメータのみが対象となります。
:param params: `ParamTable` 形式のパラメータデータ。
:param names: 境界値を取得するパラメータ名のシーケンス。指定しない場合はすべてのパラメータが対象になります。
:returns: パラメータ名がキー、`(pmin, pmax)` タプルが値の辞書。
"""
if names is None:
names = list(params.keys())
out = {}
for name in names:
row = params[name]
lo = float(row.get("pmin", np.nan))
hi = float(row.get("pmax", np.nan))
if not np.isfinite(lo):
lo = -np.inf
if not np.isfinite(hi):
hi = np.inf
out[name] = (lo, hi)
return out
[ドキュメント]
def bounds_penalty(
params: Mapping[str, Mapping[str, object]],
values: Mapping[str, float],
*,
names: Optional[Sequence[str]] = None,
) -> float:
"""パラメータが指定された境界 (`pmin`, `pmax`) から逸脱した場合の二乗ペナルティを計算します。
各パラメータについて、現在の値 (`values`) が `pmin` または `pmax` の範囲外にある場合に、
`kpenalty * deviation^2` の形式でペナルティを計算し、その合計を返します。
`pmin` や `pmax` が非有限値の場合は、その境界は適用されません。
:param params: `ParamTable` 形式のパラメータデータ(`pmin`, `pmax`, `kpenalty` を含む)。
:param values: 現在のパラメータ値。
:param names: ペナルティ計算の対象とするパラメータ名のシーケンス。指定しない場合はすべてのパラメータが対象になります。
:returns: 計算された合計ペナルティ値。
"""
if names is None:
names = list(params.keys())
penalty = 0.0
for name in names:
if name not in params or name not in values:
continue
row = params[name]
value = float(values[name])
pmin = float(row.get("pmin", np.nan))
pmax = float(row.get("pmax", np.nan))
kpenalty = float(row.get("kpenalty", 1.0e6))
if np.isfinite(pmin) and value < pmin:
penalty += kpenalty * (value - pmin) ** 2
if np.isfinite(pmax) and value > pmax:
penalty += kpenalty * (value - pmax) ** 2
return float(penalty)
[ドキュメント]
def range_warnings(
params: Mapping[str, Mapping[str, object]],
values: Mapping[str, float],
*,
names: Optional[Sequence[str]] = None,
) -> List[str]:
"""パラメータが指定された境界 (`pmin`, `pmax`) から逸脱している場合の警告メッセージのリストを返します。
各パラメータについて、現在の値 (`values`) が `pmin` または `pmax` の範囲外にある場合に、
警告文字列を生成します。`pmin` や `pmax` が非有限値の場合は、その境界は適用されません。
:param params: `ParamTable` 形式のパラメータデータ(`pmin`, `pmax` を含む)。
:param values: 現在のパラメータ値。
:param names: 警告チェックの対象とするパラメータ名のシーケンス。指定しない場合はすべてのパラメータが対象になります。
:returns: 範囲外パラメータに対する警告メッセージのリスト。
"""
if names is None:
names = list(params.keys())
warnings = []
for name in names:
if name not in params or name not in values:
continue
row = params[name]
v = float(values[name])
pmin = float(row.get("pmin", np.nan))
pmax = float(row.get("pmax", np.nan))
if np.isfinite(pmin) and v < pmin:
warnings.append(f"{name}: value={v:g} < pmin={pmin:g}")
if np.isfinite(pmax) and v > pmax:
warnings.append(f"{name}: value={v:g} > pmax={pmax:g}")
return warnings