regression.tklsq.tkparamio のソースコード

"""tkparamio.py

フィッティングパラメータ CSV の読み書きと補助関数を提供するモジュール。

このモジュールは、フィッティングパラメータをCSVファイル形式で管理するための機能を提供します。
パラメータの読み書き、デフォルト値からのCSV生成、値の更新、最適化対象のパラメータ抽出、
境界値の取得、境界ペナルティの計算、表示用文字列への整形、および範囲外警告の生成など、
多岐にわたる補助関数が含まれています。

CSV columns:
    varname,optid,optid_lin,p0,pmin,pmax,kpenalty

optional columns:
    stderr, note, unit など

方針:
- optid=1      : 非線形最適化対象
- optid_lin=1  : 線形最適化対象
- pmin/pmax が有限値なら、kpenalty * deviation^2 で範囲ペナルティを加える

:doc:`tkparamio_usage`
"""

from __future__ import annotations

from pathlib import Path
from typing import Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union
import csv
import math
import numpy as np


ParamRow = Dict[str, object]
ParamTable = Dict[str, ParamRow]

REQUIRED_COLUMNS = ["varname", "optid", "optid_lin", "p0", "pmin", "pmax", "kpenalty"]
DEFAULT_COLUMNS = REQUIRED_COLUMNS + ["stderr"]


[ドキュメント] def parse_float(value, default: float = np.nan) -> float: """値を浮動小数点数に変換し、空欄やNoneの場合はデフォルト値を適用します。 文字列の空白をトリムし、空の場合は `default` を返します。 :param value: 変換対象の値。 :param default: 変換に失敗した場合や空欄の場合に返すデフォルト値。デフォルトは `np.nan`。 :returns: 変換された浮動小数点数。 """ if value is None: return default s = str(value).strip() if s == "": return default return float(s)
[ドキュメント] def parse_int(value, default: int = 0) -> int: """値を整数に変換し、空欄やNoneの場合はデフォルト値を適用します。 文字列の空白をトリムし、空の場合は `default` を返します。 浮動小数点数として解釈可能な文字列も整数に変換します。 :param value: 変換対象の値。 :param default: 変換に失敗した場合や空欄の場合に返すデフォルト値。デフォルトは0。 :returns: 変換された整数。 """ if value is None: return default s = str(value).strip() if s == "": return default return int(float(s))
[ドキュメント] def normalize_param_row(row: Mapping[str, object]) -> ParamRow: """パラメータの行データを標準形式に正規化します。 `varname` の存在を確認し、必須カラム (`optid`, `optid_lin`, `p0`, `pmin`, `pmax`, `kpenalty`) の値を適切な型に変換します。`stderr` が存在する場合はこれも変換します。 余分な列は保持されます。 :param row: 変換するパラメータの行データ(辞書形式)。 :returns: 正規化されたパラメータ行データ。 :raises ValueError: `varname` が存在しない、または空の場合に発生します。 """ if "varname" not in row: raise ValueError("parameter row must contain 'varname'") name = str(row["varname"]).strip() if not name: raise ValueError("empty parameter name") out: ParamRow = dict(row) out["varname"] = name out["optid"] = parse_int(row.get("optid", 0), 0) out["optid_lin"] = parse_int(row.get("optid_lin", 0), 0) out["p0"] = parse_float(row.get("p0", 0.0), 0.0) out["pmin"] = parse_float(row.get("pmin", np.nan), np.nan) out["pmax"] = parse_float(row.get("pmax", np.nan), np.nan) out["kpenalty"] = parse_float(row.get("kpenalty", 1.0e6), 1.0e6) if "stderr" in out: out["stderr"] = parse_float(out.get("stderr"), np.nan) return out
[ドキュメント] def rows_from_defaults(defaults: Sequence[Mapping[str, object]]) -> ParamTable: """デフォルトパラメータ定義のリストから ParamTable を作成します。 与えられた各行を `normalize_param_row` で正規化し、`varname` をキーとして `ParamTable` に格納します。 :param defaults: デフォルトパラメータの行データのシーケンス。 :returns: パラメータ名がキーの `ParamTable`。 """ params: ParamTable = {} for row in defaults: r = normalize_param_row(row) params[str(r["varname"])] = r return params
[ドキュメント] def create_param_csv( path: Union[str, Path], defaults: Sequence[Mapping[str, object]], *, overwrite: bool = False, columns: Optional[Sequence[str]] = None, ) -> None: """デフォルト値を使用してパラメータCSVファイルを作成します。 指定されたパスにCSVファイルを作成します。ファイルが既に存在し `overwrite` が `False` の場合、ファイルは作成されません。`columns` が指定されない場合、 デフォルトカラムと既存の追加カラムが自動的に含まれます。 :param path: 出力するCSVファイルのパス。 :param defaults: CSVに書き込むデフォルトパラメータのリスト。 :param overwrite: ファイルが既に存在する場合に上書きするかどうか。デフォルトは `False`。 :param columns: CSVのヘッダーとして使用するカラム名のシーケンス。指定しない場合は自動決定されます。 :returns: None """ path = Path(path) if path.exists() and not overwrite: return params = rows_from_defaults(defaults) path.parent.mkdir(parents=True, exist_ok=True) if columns is None: extra = [] for row in params.values(): for k in row.keys(): if k not in DEFAULT_COLUMNS and k not in extra: extra.append(k) columns = DEFAULT_COLUMNS + extra with path.open("w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=list(columns)) writer.writeheader() for row in params.values(): writer.writerow(_row_for_csv(row, columns))
[ドキュメント] def read_param_csv( path: Union[str, Path], *, defaults: Optional[Sequence[Mapping[str, object]]] = None, create_if_missing: bool = True, ) -> ParamTable: """パラメータCSVファイルを読み込み、`ParamTable` を返します。 CSVファイルが存在しない場合、`defaults` が提供されており `create_if_missing` が `True` であれば、`create_param_csv` を使用してファイルを自動生成してから読み込みます。 必須カラムの存在が検証されます。 :param path: 読み込むCSVファイルのパス。 :param defaults: CSVファイルが存在しない場合に、ファイルの自動生成に使用するデフォルトパラメータのリスト。 :param create_if_missing: CSVファイルが存在しない場合に自動生成するかどうか。デフォルトは `True`。 :returns: 読み込まれたパラメータデータを含む `ParamTable`。 :raises FileNotFoundError: ファイルが存在せず、自動生成も行われない場合に発生します。 :raises ValueError: CSVファイルが空であるか、必須カラムが不足している場合に発生します。 """ path = Path(path) if not path.exists(): if defaults is None or not create_if_missing: raise FileNotFoundError(path) create_param_csv(path, defaults, overwrite=False) with path.open("r", newline="", encoding="utf-8") as f: reader = csv.DictReader(f) if reader.fieldnames is None: raise ValueError(f"empty parameter CSV: {path}") missing = set(REQUIRED_COLUMNS) - set(reader.fieldnames) if missing: raise ValueError(f"parameter CSV missing columns: {sorted(missing)}") params: ParamTable = {} for row in reader: r = normalize_param_row(row) name = str(r["varname"]) params[name] = r return params
[ドキュメント] def write_param_csv( path: Union[str, Path], params: Mapping[str, Mapping[str, object]], *, values: Optional[Mapping[str, float]] = None, stderr: Optional[Mapping[str, Optional[float]]] = None, columns: Optional[Sequence[str]] = None, ) -> None: """パラメータデータをCSVファイルに保存します。 指定されたパスにCSVファイルを書き込みます。`values` が提供された場合、 対応するパラメータの `p0` 値が更新されます。`stderr` が提供された場合、 対応する `stderr` 列が更新されます。`columns` が指定されない場合、 デフォルトカラムと既存の追加カラムが自動的に含まれます。 :param path: 出力するCSVファイルのパス。 :param params: 保存するパラメータデータを含む `ParamTable`。 :param values: 更新するパラメータ名とその新しい値のマッピング。対応する `p0` が上書きされます。 :param stderr: オプションで更新するパラメータ名とその新しい標準誤差のマッピング。 対応する `stderr` が上書きされます。`None` を指定すると `np.nan` として扱われます。 :param columns: CSVのヘッダーとして使用するカラム名のシーケンス。指定しない場合は自動決定されます。 :returns: None """ path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) normalized = {name: normalize_param_row(row) for name, row in params.items()} if values is not None: for name, v in values.items(): if name in normalized: normalized[name]["p0"] = float(v) if stderr is not None: for name, se in stderr.items(): if name in normalized: normalized[name]["stderr"] = np.nan if se is None else float(se) if columns is None: extra = [] for row in normalized.values(): for k in row.keys(): if k not in DEFAULT_COLUMNS and k not in extra: extra.append(k) columns = DEFAULT_COLUMNS + extra with path.open("w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=list(columns)) writer.writeheader() for row in normalized.values(): writer.writerow(_row_for_csv(row, columns))
def _row_for_csv(row: Mapping[str, object], columns: Sequence[str]) -> Dict[str, object]: """NaN を空欄として CSV 出力用にパラメータ行データを整形します。 `numpy.nan` (非有限浮動小数点数) の値を空文字列に変換し、指定されたカラムのみを 含む辞書を生成します。これは内部補助関数です。 :param row: 整形するパラメータ行データ。 :param columns: 出力に含めるカラム名のシーケンス。 :returns: CSV出力用に整形された辞書。 """ out = {} for k in columns: v = row.get(k, "") if isinstance(v, float) and not np.isfinite(v): out[k] = "" else: out[k] = v return out
[ドキュメント] def values_from_params(params: Mapping[str, Mapping[str, object]]) -> Dict[str, float]: """`ParamTable` からパラメータ名とその初期値 (`p0`) の辞書を作成します。 各パラメータ行の `p0` 値を浮動小数点数として抽出します。 :param params: `ParamTable` 形式のパラメータデータ。 :returns: パラメータ名がキー、`p0` 値が値の辞書。 """ return {name: float(row["p0"]) for name, row in params.items()}
[ドキュメント] def update_param_values( params: ParamTable, values: Mapping[str, float], *, inplace: bool = False, ) -> ParamTable: """`ParamTable` 内のパラメータの初期値 (`p0`) を更新します。 `values` で指定されたパラメータの `p0` 値を更新します。`inplace` が `True` の場合、 元の `ParamTable` が変更されます。`False` の場合、新しい `ParamTable` が作成されて返されます。 :param params: 更新対象の `ParamTable`。 :param values: 更新するパラメータ名と新しい値のマッピング。 :param inplace: 更新をインプレースで行うかどうか。デフォルトは `False`。 :returns: 更新された `ParamTable`。 """ out = params if inplace else {k: dict(v) for k, v in params.items()} for name, value in values.items(): if name in out: out[name]["p0"] = float(value) return out
[ドキュメント] def nonlinear_param_names(params: Mapping[str, Mapping[str, object]]) -> List[str]: """非線形最適化の対象となるパラメータ名(`optid=1`)のリストを返します。 `ParamTable` 内の各パラメータ行をチェックし、`optid` が `1` に設定されている パラメータ名のみを抽出します。 :param params: `ParamTable` 形式のパラメータデータ。 :returns: 非線形最適化対象のパラメータ名のリスト。 """ return [name for name, row in params.items() if int(row.get("optid", 0)) == 1]
[ドキュメント] def linear_param_names(params: Mapping[str, Mapping[str, object]]) -> List[str]: """線形最適化の対象となるパラメータ名(`optid_lin=1`)のリストを返します。 `ParamTable` 内の各パラメータ行をチェックし、`optid_lin` が `1` に設定されている パラメータ名のみを抽出します。 :param params: `ParamTable` 形式のパラメータデータ。 :returns: 線形最適化対象のパラメータ名のリスト。 """ return [name for name, row in params.items() if int(row.get("optid_lin", 0)) == 1]
[ドキュメント] def free_param_names(params: Mapping[str, Mapping[str, object]]) -> List[str]: """非線形または線形最適化の対象となるすべてのパラメータ名のリストを返します。 `ParamTable` 内の各パラメータ行をチェックし、`optid` または `optid_lin` の いずれかが `1` に設定されているパラメータ名のみを抽出します。 :param params: `ParamTable` 形式のパラメータデータ。 :returns: 非線形または線形最適化対象のパラメータ名のリスト。 """ names = [] for name, row in params.items(): if int(row.get("optid", 0)) == 1 or int(row.get("optid_lin", 0)) == 1: names.append(name) return names
[ドキュメント] def fixed_param_names(params: Mapping[str, Mapping[str, object]]) -> List[str]: """最適化対象外となる固定パラメータ名(`optid=0` かつ `optid_lin=0`)のリストを返します。 `ParamTable` 内の各パラメータ行をチェックし、`optid` と `optid_lin` の両方が `0` に設定されているパラメータ名のみを抽出します。 :param params: `ParamTable` 形式のパラメータデータ。 :returns: 固定パラメータ名のリスト。 """ names = [] for name, row in params.items(): if int(row.get("optid", 0)) == 0 and int(row.get("optid_lin", 0)) == 0: names.append(name) return names
[ドキュメント] def bounds_from_params( params: Mapping[str, Mapping[str, object]], names: Optional[Sequence[str]] = None, ) -> Dict[str, Tuple[float, float]]: """パラメータの最小値 (`pmin`) と最大値 (`pmax`) から境界値の辞書を作成します。 各パラメータの `pmin` と `pmax` を抽出し、`(-inf, inf)` をデフォルトとして 境界タプル `(lo, hi)` を作成します。`names` が指定された場合、そのパラメータのみが対象となります。 :param params: `ParamTable` 形式のパラメータデータ。 :param names: 境界値を取得するパラメータ名のシーケンス。指定しない場合はすべてのパラメータが対象になります。 :returns: パラメータ名がキー、`(pmin, pmax)` タプルが値の辞書。 """ if names is None: names = list(params.keys()) out = {} for name in names: row = params[name] lo = float(row.get("pmin", np.nan)) hi = float(row.get("pmax", np.nan)) if not np.isfinite(lo): lo = -np.inf if not np.isfinite(hi): hi = np.inf out[name] = (lo, hi) return out
[ドキュメント] def bounds_penalty( params: Mapping[str, Mapping[str, object]], values: Mapping[str, float], *, names: Optional[Sequence[str]] = None, ) -> float: """パラメータが指定された境界 (`pmin`, `pmax`) から逸脱した場合の二乗ペナルティを計算します。 各パラメータについて、現在の値 (`values`) が `pmin` または `pmax` の範囲外にある場合に、 `kpenalty * deviation^2` の形式でペナルティを計算し、その合計を返します。 `pmin` や `pmax` が非有限値の場合は、その境界は適用されません。 :param params: `ParamTable` 形式のパラメータデータ(`pmin`, `pmax`, `kpenalty` を含む)。 :param values: 現在のパラメータ値。 :param names: ペナルティ計算の対象とするパラメータ名のシーケンス。指定しない場合はすべてのパラメータが対象になります。 :returns: 計算された合計ペナルティ値。 """ if names is None: names = list(params.keys()) penalty = 0.0 for name in names: if name not in params or name not in values: continue row = params[name] value = float(values[name]) pmin = float(row.get("pmin", np.nan)) pmax = float(row.get("pmax", np.nan)) kpenalty = float(row.get("kpenalty", 1.0e6)) if np.isfinite(pmin) and value < pmin: penalty += kpenalty * (value - pmin) ** 2 if np.isfinite(pmax) and value > pmax: penalty += kpenalty * (value - pmax) ** 2 return float(penalty)
[ドキュメント] def format_params( values: Mapping[str, float], *, stderr: Optional[Mapping[str, Optional[float]]] = None, names: Optional[Sequence[str]] = None, title: Optional[str] = None, ) -> str: """パラメータと標準誤差を整形された文字列として出力します。 各パラメータについて、値とオプションで標準誤差を組み合わせた行を生成します。 標準誤差が利用できない、または非有限値の場合、値のみが表示されます。 `names` が指定された場合、そのパラメータのみが対象となり、指定された順序で表示されます。 `title` を追加できます。 :param values: パラメータ名とその値のマッピング。 :param stderr: オプションでパラメータ名とその標準誤差のマッピング。 `None` の場合は標準誤差は表示されません。 :param names: 表示するパラメータ名のシーケンス。指定しない場合は `values` のすべてのキーが対象になります。 :param title: 出力文字列の冒頭に追加するタイトル。 :returns: 整形されたパラメータ情報の複数行文字列。 """ if names is None: names = list(values.keys()) lines: List[str] = [] if title: lines.append(title) for name in names: v = float(values[name]) if stderr is None or name not in stderr or stderr[name] is None: lines.append(f"{name:16s} = {v: .10g}") else: se = stderr[name] if se is None or not np.isfinite(float(se)): lines.append(f"{name:16s} = {v: .10g}") else: lines.append(f"{name:16s} = {v: .10g} ± {float(se):.3g}") return "\n".join(lines)
[ドキュメント] def range_warnings( params: Mapping[str, Mapping[str, object]], values: Mapping[str, float], *, names: Optional[Sequence[str]] = None, ) -> List[str]: """パラメータが指定された境界 (`pmin`, `pmax`) から逸脱している場合の警告メッセージのリストを返します。 各パラメータについて、現在の値 (`values`) が `pmin` または `pmax` の範囲外にある場合に、 警告文字列を生成します。`pmin` や `pmax` が非有限値の場合は、その境界は適用されません。 :param params: `ParamTable` 形式のパラメータデータ(`pmin`, `pmax` を含む)。 :param values: 現在のパラメータ値。 :param names: 警告チェックの対象とするパラメータ名のシーケンス。指定しない場合はすべてのパラメータが対象になります。 :returns: 範囲外パラメータに対する警告メッセージのリスト。 """ if names is None: names = list(params.keys()) warnings = [] for name in names: if name not in params or name not in values: continue row = params[name] v = float(values[name]) pmin = float(row.get("pmin", np.nan)) pmax = float(row.get("pmax", np.nan)) if np.isfinite(pmin) and v < pmin: warnings.append(f"{name}: value={v:g} < pmin={pmin:g}") if np.isfinite(pmax) and v > pmax: warnings.append(f"{name}: value={v:g} > pmax={pmax:g}") return warnings