"""
tkdataio.py
~~~~~~~~~~~
軽量な表データ入出力ユーティリティ。
既存の tkVariousData/tkFit を置き換える目的ではなく、独立した小さな解析スクリプトを
素早く作るための補助モジュール。
:doc:`tkdataio_usage`
"""
from __future__ import annotations
from pathlib import Path
from typing import Dict, Mapping, Optional, Sequence, Tuple, Union
import json
import numpy as np
import pandas as pd # pandasが常にインポートされることを前提として追加。実際にはread_table内でtry-exceptされているが、Docstringの型ヒントで必要になるため。
ColumnKey = Union[int, str]
[ドキュメント]
def read_table(path: Union[str, Path], *, sheet_name=0) -> pd.DataFrame:
"""CSV/TSV/TXT/XLSX ファイルを pandas.DataFrame として読み込みます。
指定されたファイルの拡張子に基づき、適切なpandasの読み込み関数を使用して表データをDataFrameとして読み込みます。
対応フォーマットはCSV (.csv)、TSV/TXT/DAT (.tsv, .txt, .dat)、XLSX/XLSM/XLS (.xlsx, .xlsm, .xls) です。
:param path: Union[str, Path] 読み込むファイルのパス。
:param sheet_name: Union[int, str] Excelファイルの場合に読み込むシートの名前またはインデックス。デフォルトは最初のシート (0)。
:returns: pandas.DataFrame 読み込まれた表データ。
:raises ImportError: pandasまたはopenpyxlがインストールされていない場合に発生します。
:raises ValueError: サポートされていないファイル形式の場合に発生します。
"""
try:
import pandas as pd
except Exception as exc: # pragma: no cover
raise ImportError("read_table requires pandas. Install with: pip install pandas openpyxl") from exc
path = Path(path)
ext = path.suffix.lower()
if ext == ".csv":
return pd.read_csv(path)
if ext in [".tsv", ".txt", ".dat"]:
# まずタブ、失敗しやすい空白区切りはユーザ側で pandas を直接使う想定
return pd.read_csv(path, sep="\t")
if ext in [".xlsx", ".xlsm", ".xls"]:
return pd.read_excel(path, sheet_name=sheet_name)
raise ValueError(f"Unsupported table format: {path.suffix}")
def _column_from_key(df: pd.DataFrame, key: ColumnKey) -> Tuple[str, np.ndarray]:
"""DataFrame から列をインデックスまたはラベルで取り出します。
指定されたキーが整数の場合、列のインデックスとして扱います。
文字列の場合、まず列名として検索し、見つからなければ数字文字列として解釈しインデックスとして扱います。
結果はfloat型のNumPy配列として返されます。
:param df: pandas.DataFrame 処理対象のDataFrame。
:param key: ColumnKey (Union[int, str]) 列のインデックス (整数) または列名 (文字列)。数字文字列もインデックスとして扱われます。
:returns: Tuple[str, numpy.ndarray] 列名と、その列のデータ (float型のNumPy配列) のタプル。
:raises KeyError: 指定されたキーに対応する列が見つからない場合に発生します。
"""
if isinstance(key, int):
return df.columns[key], df.iloc[:, key].to_numpy(dtype=float)
# 数字文字列は index としても扱えるようにする
if isinstance(key, str):
ks = key.strip()
if ks.lstrip("+-").isdigit():
idx = int(ks)
return df.columns[idx], df.iloc[:, idx].to_numpy(dtype=float)
if key in df.columns:
return key, df[key].to_numpy(dtype=float)
raise KeyError(f"Column {key!r} not found")
[ドキュメント]
def read_xy(
path: Union[str, Path],
x: ColumnKey = 0,
y: ColumnKey = 1,
*,
sheet_name: Union[int, str] = 0,
xmin: float = -1e100,
xmax: float = 1e100,
dropna: bool = True,
) -> Tuple[np.ndarray, np.ndarray, str, str]:
"""表ファイルから x, y の2列を取り出します。
指定されたファイルから表データを読み込み、x軸とy軸に対応する2つの列を抽出します。
オプションで、x軸の値による範囲フィルタリングや、NaN値の除外を行うことができます。
:param path: Union[str, Path] 読み込むファイルのパス。
:param x: ColumnKey (Union[int, str]) x軸データとして取り出す列のインデックスまたは列名。デフォルトは最初の列 (0)。
:param y: ColumnKey (Union[int, str]) y軸データとして取り出す列のインデックスまたは列名。デフォルトは2番目の列 (1)。
:param sheet_name: Union[int, str] Excelファイルの場合に読み込むシートの名前またはインデックス。デフォルトは最初のシート (0)。
:param xmin: float xデータの最小値。この値未満のデータは除外されます。デフォルトは-1e100。
:param xmax: float xデータの最大値。この値を超えるデータは除外されます。デフォルトは1e100。
:param dropna: bool Trueの場合、xまたはyがNaN (非数) の行を除外します。デフォルトはTrue。
:returns: Tuple[numpy.ndarray, numpy.ndarray, str, str]
フィルタリング後のx軸データ (NumPy配列)、フィルタリング後のy軸データ (NumPy配列)、
x軸の列名 (文字列)、y軸の列名 (文字列) のタプル。
:raises ImportError: pandasまたはopenpyxlがインストールされていない場合に発生します (read_table経由)。
:raises ValueError: サポートされていないファイル形式の場合に発生します (read_table経由)。
:raises KeyError: 指定されたキーに対応する列が見つからない場合に発生します (_column_from_key経由)。
"""
df = read_table(path, sheet_name=sheet_name)
xlabel, xv = _column_from_key(df, x)
ylabel, yv = _column_from_key(df, y)
xv = np.asarray(xv, dtype=float)
yv = np.asarray(yv, dtype=float)
mask = (xv >= xmin) & (xv <= xmax)
if dropna:
mask &= np.isfinite(xv) & np.isfinite(yv)
return xv[mask], yv[mask], str(xlabel), str(ylabel)
[ドキュメント]
def write_excel_tables(
path: Union[str, Path],
tables: Mapping[str, object],
*,
index: bool = False,
) -> None:
"""複数テーブルを Excel の複数シートに保存します。
`tables` 引数で指定された複数のテーブルデータを、一つのExcelファイルの異なるシートに保存します。
`tables` の各値は、以下のいずれかの形式である必要があります:
- `pandas.DataFrame`: そのままExcelシートとして書き込まれます。
- `dict`: キーを列名、値を列データとする辞書。`pandas.DataFrame` に変換されて書き込まれます。
スカラー値や異なる長さのリストも適切に処理され、欠損値は `None` で埋められます。
- 2次元配列 (`numpy.ndarray` など): `pandas.DataFrame` に変換されて書き込まれます。
保存先のディレクトリが存在しない場合は自動的に作成されます。
:param path: Union[str, Path] 保存するExcelファイルのパス。
:param tables: Mapping[str, object] シート名 (文字列) をキーとし、保存するテーブルデータを値とするマッピング。
値は `pandas.DataFrame`、`dict` (列名と値のペア)、または2次元配列 (`numpy.ndarray` など) のいずれかを想定。
:param index: bool DataFrameのインデックスをExcelに出力するかどうか。デフォルトはFalse。
:returns: None
:raises ImportError: pandasまたはopenpyxlがインストールされていない場合に発生します。
"""
try:
import pandas as pd
except Exception as exc: # pragma: no cover
raise ImportError("write_excel_tables requires pandas/openpyxl") from exc
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with pd.ExcelWriter(path, engine="openpyxl") as writer:
for sheet, obj in tables.items():
sheet_name = str(sheet)[:31] or "Sheet"
if hasattr(obj, "to_excel"):
df = obj
elif isinstance(obj, Mapping):
# dict of arrays/scalars
maxlen = 1
for v in obj.values():
if isinstance(v, (str, bytes)):
continue
try:
maxlen = max(maxlen, len(v))
except TypeError:
pass
data = {}
for k, v in obj.items():
if isinstance(v, (str, bytes)):
data[k] = [v] + [None] * (maxlen - 1)
else:
try:
vv = list(v)
data[k] = vv + [None] * (maxlen - len(vv))
except TypeError:
data[k] = [v] + [None] * (maxlen - 1)
df = pd.DataFrame(data)
else:
arr = np.asarray(obj)
df = pd.DataFrame(arr)
df.to_excel(writer, sheet_name=sheet_name, index=index)
[ドキュメント]
def save_json(path: Union[str, Path], data: object, *, indent: int = 2) -> None:
"""データをJSON形式でファイルに保存します。
NumPyのデータ型 (ndarray, integer, floating) は、自動的にPythonの対応する型 (list, int, float) に変換されます。
また、`__dict__` 属性を持つオブジェクトもシリアライズされます。
保存先のディレクトリが存在しない場合は自動的に作成されます。
:param path: Union[str, Path] 保存するJSONファイルのパス。
:param data: object JSONとして保存するデータ。NumPy型はPython型へ変換されます。
`__dict__` 属性を持つカスタムオブジェクトも対応します。
:param indent: int JSON出力のインデントレベル。デフォルトは2。
:returns: None
:raises TypeError: JSONシリアライズできない型のオブジェクトが含まれている場合に発生します。
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
def convert(o):
if isinstance(o, np.ndarray):
return o.tolist()
if isinstance(o, (np.floating, np.integer)):
return o.item()
if hasattr(o, "__dict__"):
return o.__dict__
raise TypeError(f"Object of type {type(o).__name__} is not JSON serializable")
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=indent, default=convert)
[ドキュメント]
def load_json(path: Union[str, Path]) -> object:
"""JSON形式のファイルを読み込み、Pythonオブジェクトとして返します。
:param path: Union[str, Path] 読み込むJSONファイルのパス。
:returns: object 読み込まれたJSONデータ (Pythonオブジェクト)。
:raises FileNotFoundError: 指定されたパスのファイルが見つからない場合に発生します。
:raises json.JSONDecodeError: ファイルの内容が有効なJSON形式でない場合に発生します。
"""
with open(path, "r", encoding="utf-8") as f:
return json.load(f)