#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Hall測定装置のCSVファイルから、多層モデル解析に使いやすい primary データをExcelに追記するプログラム。
:doc:`collect_resitest8300_usage`
このプログラムは、Resitest 8300型などのHall測定結果CSVファイルから、
比抵抗、Hall係数、キャリア濃度、移動度などのデータを抽出し、
整形された「primary」データとしてExcelシートに追記します。
また、処理したファイルに関する情報も別のシートに記録します。
想定している入力:
- 8310型などの Hall 測定結果CSV
- cp932 / Shift_JIS 系の日本語CSV
- 1ファイル中に複数温度・複数測定No.のブロックが入っている形式
基本方針:
- 第一引数: 出力先 Excel ファイル
- 第二引数以降: CSVファイルまたはワイルドカード
- Excel が存在しなければ新規作成
- Excel が存在すれば単純追記
- 重複チェック・置換はしない
- Hall係数は carrier type が N/P と判定できる場合のみ符号付きに変換
- シートHall係数の再計算値は出力しない
- 装置CSV中の平均値・最小・最大から半幅誤差と相対誤差を出力
使い方:
python collect_hall_primary.py Hall_summary.xlsx "*.CSV"
例:
python collect_hall_primary.py Hall_summary.xlsx "D:\\data\\*.CSV"
python collect_hall_primary.py Hall_summary.xlsx "*.CSV" "subdir\\*.csv"
python collect_hall_primary.py Hall_summary.xlsx "**\\*.CSV" --recursive
python collect_hall_primary.py Hall_summary.xlsx "*.CSV" --dry-run
必要ライブラリ:
pip install openpyxl
"""
from __future__ import annotations
import argparse
import csv
import datetime as _dt
import glob
import math
import re
import sys
import unicodedata
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
FLOAT_RE = re.compile(
r"[-+]?(?:(?:\d+\.\d*)|(?:\.\d+)|(?:\d+))(?:[Ee][+-]?\d+)?"
)
PRIMARY_COLUMNS = [
"source_file",
"source_basename",
"imported_at",
"sample_name",
"measurement_date",
"measurement_time",
"measurement_datetime",
"operator",
"comment",
"measurement_no",
"thickness_value",
"thickness_unit",
"thickness_cm",
"rho_T_K",
"hall_T_K",
"rho_start_time",
"rho_end_time",
"hall_start_time",
"hall_end_time",
"rho_set_current_A",
"hall_set_current_A",
"hall_measured_current_A",
"hall_measured_current_min_A",
"hall_measured_current_max_A",
"hall_B_T",
"sheet_resistance_ohm_sq",
"sheet_conductance_S_sq",
"rho_ohm_cm",
"sigma_S_cm",
"hall_voltage_V",
"hall_phase_deg",
"hall_noise_V",
"hall_noise_percent",
"hall_drift_percent",
"hall_direction_dependence_percent",
"carrier_type_raw",
"hall_sign_status",
"hall_coeff_cm3_C",
"hall_coeff_min_cm3_C",
"hall_coeff_max_cm3_C",
"hall_coeff_err_cm3_C",
"hall_coeff_err_percent",
"carrier_density_cm3",
"carrier_density_min_cm3",
"carrier_density_max_cm3",
"carrier_density_err_cm3",
"carrier_density_err_percent",
"sheet_carrier_density_cm2",
"sheet_carrier_density_min_cm2",
"sheet_carrier_density_max_cm2",
"sheet_carrier_density_err_cm2",
"sheet_carrier_density_err_percent",
"mobility_cm2_Vs",
"mobility_min_cm2_Vs",
"mobility_max_cm2_Vs",
"mobility_err_cm2_Vs",
"mobility_err_percent",
"rho_noise_percent",
"rho_F_value",
"quality_flag",
"parser_warning",
]
FILES_COLUMNS = [
"source_file",
"source_basename",
"file_size",
"modified_time",
"imported_at",
"sample_name",
"measurement_date",
"measurement_time",
"measurement_datetime",
"operator",
"comment",
"thickness_value",
"thickness_unit",
"thickness_cm",
"num_measurements",
"parser_warning",
]
# CSV中のパラメータ名 -> (列名ベース, 単位サフィックス, Hall係数として符号変換するか)
PARAM_MAP = {
"ホール係数": ("hall_coeff", "cm3_C", True),
"キャリア濃度": ("carrier_density", "cm3", False),
"シートキャリア濃度": ("sheet_carrier_density", "cm2", False),
"移動度": ("mobility", "cm2_Vs", False),
}
[ドキュメント]
def norm(s: Any) -> str:
"""全角英数字や半角カナを比較しやすい形に正規化する。
詳細説明:
`unicodedata.normalize("NFKC", ...)` を使用して、
互換等価性を持つ文字を正規化し、前後の空白を除去します。
これにより、例えば「ABC」や「アイウ」のような文字が
「ABC」や「アイウ」に変換され、比較やパースが容易になります。
:param s: 正規化する任意の入力値。
:returns: 正規化された文字列。
"""
return unicodedata.normalize("NFKC", str(s or "")).strip()
[ドキュメント]
def clean_cell(s: Any) -> str:
"""CSVセルのNUL文字と前後空白を除去する。
詳細説明:
CSVファイルによっては、内部にNUL文字 (\\x00) が含まれていることがあり、
これを削除することで文字列処理の問題を回避します。
また、前後の空白も `strip()` メソッドで除去します。
入力が `None` の場合は空文字列を返します。
:param s: CSVセルから読み取られた値。
:returns: クリーンアップされた文字列。
"""
if s is None:
return ""
return str(s).replace("\x00", "").strip()
[ドキュメント]
def parse_float(s: Any) -> Optional[float]:
"""セル文字列から最初の数値を取り出す。
詳細説明:
装置CSVでは `* 1.23E+4`, `# -3.0E-2` のような警告記号付き数値が
出力されることがあります。この関数は、単純な `float()` 変換ではなく、
正規表現 `FLOAT_RE` を用いて文字列中の最初の数値部分を抽出し、
それを浮動小数点数として解析します。
また、カンマ(`,`)や全角ハイフン(`−`)も除去してパースを試みます。
:param s: 数値を含む可能性のある文字列または任意のオブジェクト。
:returns: 抽出された浮動小数点数、または数値が見つからない場合は None。
"""
t = norm(str(s or "").replace("\x00", ""))
t = t.replace(",", "")
t = t.replace("−", "-")
m = FLOAT_RE.search(t)
if not m:
return None
try:
return float(m.group(0))
except ValueError:
return None
[ドキュメント]
def parse_unit(s: Any) -> str:
"""[ mm ] のような単位表記を mm に整形する。
詳細説明:
入力された単位文字列から、角括弧 `[]` や全角の `[]` を除去し、
さらにマイクロ記号 `μ` や `µ` を `um` に統一して、
単位を比較しやすい標準的な形式に整形します。
:param s: 単位を表す文字列。
:returns: 整形された単位文字列。
"""
u = norm(s)
for ch in "[][]":
u = u.replace(ch, "")
u = u.replace("μ", "um").replace("µ", "um")
return u.strip()
[ドキュメント]
def thickness_to_cm(value: Optional[float], unit: str) -> Optional[float]:
"""厚さを cm に換算する。未知単位なら None。
詳細説明:
入力された厚さの値と単位から、その厚さをセンチメートル (cm) に換算します。
以下の単位に対応しています。
- cm: 1.0
- mm: 1.0e-1
- um (μm): 1.0e-4
- nm: 1.0e-7
- m: 100.0
- A, Å, ang, angstrom: 1.0e-8
対応していない単位が指定された場合、または `value` が None の場合は None を返します。
:param value: 厚さの数値。
:param unit: 厚さの単位を表す文字列。
:returns: cm に換算された厚さ、または換算できない場合は None。
"""
if value is None:
return None
u = norm(unit).lower().replace("μ", "u").replace("µ", "u")
u = u.replace(" ", "")
mapping = {
"cm": 1.0,
"mm": 1.0e-1,
"um": 1.0e-4,
"nm": 1.0e-7,
"m": 100.0,
"a": 1.0e-8,
"å": 1.0e-8,
"ang": 1.0e-8,
"angstrom": 1.0e-8,
}
factor = mapping.get(u)
if factor is None:
return None
return value * factor
[ドキュメント]
def read_csv_rows(path: Path, encoding: str = "cp932") -> List[List[str]]:
"""CSVを読み込み、セル配列として返す。
詳細説明:
指定されたパスのCSVファイルを、指定されたエンコーディングで読み込みます。
読み込み中に発生するデコードエラーは `replace` モードで処理され、
NUL文字 (\\x00) は除去されます。
各セルは `clean_cell` 関数で整形され、文字列のリストのリストとして返されます。
:param path: 読み込むCSVファイルのパス。
:param encoding: CSVファイルの文字エンコーディング。デフォルトは "cp932"。
:returns: CSVの内容を表す文字列のリストのリスト。
"""
raw = path.read_bytes()
text = raw.decode(encoding, errors="replace").replace("\x00", "")
return [[clean_cell(c) for c in row] for row in csv.reader(text.splitlines())]
[ドキュメント]
def first_nonempty(row: Sequence[str]) -> str:
"""行の最初の空でないセルを返す。
詳細説明:
指定された行 (文字列のシーケンス) を先頭から順に走査し、
最初に見つかった空でない文字列を返します。
行が空であるか、すべてのセルが空文字列である場合は空文字列を返します。
:param row: 文字列のシーケンス(CSVの1行に相当)。
:returns: 行の最初の空でないセル、または空文字列。
"""
for c in row:
if c:
return c
return ""
[ドキュメント]
def row_label(row: Sequence[str]) -> str:
"""行のラベル(最初のセル)を正規化して返す。
詳細説明:
CSVの行の最初のセルをラベルとみなし、`norm` 関数で正規化します。
これにより、ラベルの比較やキーとしての使用が容易になります。
行が空の場合は空文字列を返します。
:param row: 文字列のシーケンス(CSVの1行に相当)。
:returns: 正規化された行ラベル、または空文字列。
"""
return norm(row[0]) if row else ""
[ドキュメント]
def split_blocks(rows: List[List[str]]) -> Tuple[List[List[str]], List[Tuple[str, List[List[str]]]]]:
"""CSVの行データを `<<<...>>>` ブロックに分割する。
詳細説明:
Hall測定装置のCSVファイルは、ファイル冒頭の共通情報と、
`<<<比抵抗測定結果>>>` や `<<<AC磁場ホール測定結果>>>` のような
ブロックで構成されています。この関数は、これらのブロックマーカーを
識別し、CSVデータをヘッダー行とタイトル付きのブロックに分割します。
最初の `<<<...>>>` が現れるまでの行はヘッダー行として扱われます。
:param rows: CSVファイルから読み込まれたすべての行データ。
:returns: ヘッダー行のリストと、(ブロックタイトル, ブロック内の行データ) のタプルのリスト。
"""
header_rows: List[List[str]] = []
blocks: List[Tuple[str, List[List[str]]]] = []
current_title: Optional[str] = None
current_rows: List[List[str]] = []
for row in rows:
first = first_nonempty(row)
nfirst = norm(first)
if nfirst.startswith("<<<") and nfirst.endswith(">>>"):
if current_title is None:
header_rows.extend(current_rows)
else:
blocks.append((current_title, current_rows))
current_title = first
current_rows = []
else:
current_rows.append(row)
if current_title is None:
header_rows.extend(current_rows)
else:
blocks.append((current_title, current_rows))
return header_rows, blocks
[ドキュメント]
def parse_global_info(header_rows: List[List[str]]) -> Dict[str, Any]:
"""ファイル冒頭の共通情報を読む。
詳細説明:
CSVファイルのヘッダー部分から、試料名、測定年月日、測定者、コメント、
試料厚みなどのグローバルな情報を抽出します。
試料厚みについては、値と単位をパースし、センチメートルへの換算も行います。
抽出された情報は辞書として返されます。
:param header_rows: ファイル冒頭のヘッダー行のリスト。
:returns: 抽出されたグローバル情報を含む辞書。
"""
info: Dict[str, Any] = {}
for row in header_rows:
lbl = row_label(row)
if "試料名" in lbl:
info["sample_name"] = row[1] if len(row) > 1 else ""
elif "測定年月日" in lbl:
date = row[1] if len(row) > 1 else ""
time = row[2] if len(row) > 2 else ""
info["measurement_date"] = date
info["measurement_time"] = time
info["measurement_datetime"] = (date + " " + time).strip()
elif "測定者" in lbl:
info["operator"] = row[1] if len(row) > 1 else ""
elif "コメント" in lbl:
info["comment"] = row[1] if len(row) > 1 else ""
elif "試料厚み" in lbl or "試料厚さ" in lbl:
value = parse_float(row[1] if len(row) > 1 else "")
unit = parse_unit(row[2] if len(row) > 2 else "")
info["thickness_value"] = value
info["thickness_unit"] = unit
info["thickness_cm"] = thickness_to_cm(value, unit)
return info
[ドキュメント]
def parse_common_block_items(block_rows: List[List[str]]) -> Dict[str, Any]:
"""比抵抗・Hallの各ブロック冒頭に共通する情報を読む。
詳細説明:
比抵抗測定結果ブロックやホール測定結果ブロックの冒頭に共通して現れる、
測定No、測定温度、測定開始/終了時刻、設定電流などの情報を抽出します。
これらの情報は、各測定レコードの基本情報として使用されます。
:param block_rows: 特定の測定ブロック内の行データ。
:returns: 抽出された共通情報を含む辞書。
"""
d: Dict[str, Any] = {}
for row in block_rows:
lbl = row_label(row)
if lbl.startswith("測定No") or "測定No" in lbl:
v = parse_float(row[1] if len(row) > 1 else "")
d["measurement_no"] = int(v) if v is not None else None
elif lbl == "測定温度":
d["T_K"] = parse_float(row[1] if len(row) > 1 else "")
elif lbl == "測定開始時刻":
d["start_time"] = row[1] if len(row) > 1 else ""
elif lbl == "測定終了時刻":
d["end_time"] = row[1] if len(row) > 1 else ""
elif lbl == "設定電流":
d["set_current_A"] = parse_float(row[1] if len(row) > 1 else "")
return d
[ドキュメント]
def parse_rho_result(block_rows: List[List[str]]) -> Dict[str, Any]:
"""`<<<比抵抗測定結果>>>` ブロックを解析する。
詳細説明:
比抵抗測定結果ブロックから、F値、比抵抗値、シート抵抗値、ノイズ
などの測定結果を抽出します。
共通情報 (`parse_common_block_items` から得られるもの) も取り込み、
適切なキー名で辞書に格納します。
:param block_rows: 比抵抗測定結果ブロック内の行データ。
:returns: 抽出された比抵抗測定結果を含む辞書。
"""
common = parse_common_block_items(block_rows)
d: Dict[str, Any] = {}
if common.get("measurement_no") is not None:
d["measurement_no"] = common["measurement_no"]
d["rho_T_K"] = common.get("T_K")
d["rho_start_time"] = common.get("start_time", "")
d["rho_end_time"] = common.get("end_time", "")
d["rho_set_current_A"] = common.get("set_current_A")
for row in block_rows:
lbl = row_label(row)
if lbl == "F値":
d["rho_F_value"] = parse_float(row[1] if len(row) > 1 else "")
elif lbl == "比抵抗値":
d["rho_ohm_cm"] = parse_float(row[1] if len(row) > 1 else "")
elif lbl == "シート抵抗値":
d["sheet_resistance_ohm_sq"] = parse_float(row[1] if len(row) > 1 else "")
elif lbl == "ノイズ":
unit = parse_unit(row[2] if len(row) > 2 else "")
if "%" in unit or "%" in unit:
d["rho_noise_percent"] = parse_float(row[1] if len(row) > 1 else "")
return d
[ドキュメント]
def detect_carrier_sign(carrier_type_raw: str) -> Tuple[Optional[int], str]:
"""キャリアタイプからHall係数の符号を決定する。
詳細説明:
入力されたキャリアタイプ文字列を解析し、Hall係数の符号を決定します。
- "N" を含む場合は N型と判断し、符号 `-1` を返します。
- "P" を含む場合は P型と判断し、符号 `+1` を返します。
- 上記以外の場合は符号を決定できないため `None` を返します。
また、キャリアタイプの状態を示す文字列 ("N", "P", "unknown") も返します。
:param carrier_type_raw: CSVから読み取られたキャリアタイプ文字列。
:returns: Hall係数の符号 (1, -1, または None) と、キャリアタイプの状態を示す文字列のタプル。
"""
t = norm(carrier_type_raw).upper()
if "N" in t:
return -1, "N"
if "P" in t:
return +1, "P"
return None, "unknown"
[ドキュメント]
def normalized_param_label(label: str) -> str:
"""パラメータラベルを正規化する。
詳細説明:
入力されたパラメータラベルを `norm` 関数で正規化し、さらに空白を除去します。
これにより、CSV内の様々な表記揺れに対応し、ラベルの一貫した比較を可能にします。
:param label: 正規化するパラメータラベル文字列。
:returns: 正規化・空白除去されたパラメータラベル文字列。
"""
return norm(label).replace(" ", "")
[ドキュメント]
def add_param_stats(
d: Dict[str, Any],
base: str,
unit_suffix: str,
avg: Optional[float],
vmin: Optional[float],
vmax: Optional[float],
*,
signed: bool = False,
sign: Optional[int] = None,
) -> None:
"""平均・最小・最大から半幅誤差と相対誤差を計算し辞書に追加する。
詳細説明:
測定値の平均値 (avg)、最小値 (vmin)、最大値 (vmax) を基に、
半幅誤差 (err) と相対誤差 (err_percent) を計算し、
これらを指定されたベース名と単位サフィックスを用いて辞書 `d` に追加します。
Hall係数 (signed=True) の場合、装置出力が絶対値で出ていると仮定し、
`sign` 引数 (N型:-1, P型:+1) に応じて符号を付与します。
N型の場合、min/maxは数直線上で min=-max_abs, max=-min_abs として保存されます。
符号変換が必要な場合、`sign` が `None` でない場合に適用されます。
:param d: 結果を追加する辞書。
:param base: パラメータのベース名 (例: "hall_coeff")。
:param unit_suffix: 単位のサフィックス (例: "cm3_C")。
:param avg: パラメータの平均値。
:param vmin: パラメータの最小値。
:param vmax: パラメータの最大値。
:param signed: Hall係数などのように符号付きで扱うべきかどうかのフラグ。
:param sign: `signed` が True の場合に適用する符号 (1 または -1)。
:returns: None
"""
if signed and sign is not None:
if avg is not None:
avg = sign * abs(avg)
if vmin is not None and vmax is not None:
a = abs(vmin)
b = abs(vmax)
if sign < 0:
vmin, vmax = -max(a, b), -min(a, b)
else:
vmin, vmax = min(a, b), max(a, b)
else:
if vmin is not None:
vmin = sign * abs(vmin)
if vmax is not None:
vmax = sign * abs(vmax)
d[f"{base}_{unit_suffix}"] = avg
d[f"{base}_min_{unit_suffix}"] = vmin
d[f"{base}_max_{unit_suffix}"] = vmax
if vmin is not None and vmax is not None:
err = abs(vmax - vmin) / 2.0
else:
err = None
d[f"{base}_err_{unit_suffix}"] = err
if err is not None and avg not in (None, 0):
d[f"{base}_err_percent"] = abs(err / avg) * 100.0
else:
d[f"{base}_err_percent"] = None
[ドキュメント]
def estimate_hall_measured_current(block_rows: List[List[str]]) -> Dict[str, Optional[float]]:
"""Hall測定結果ブロック内の端子間テーブルから実測電流の代表値を推定する。
詳細説明:
Hall測定結果ブロック内の「端子間ホール起電圧」のテーブルより前の行で、
「電流 [A], 電圧 [V], ... 電流 [A], 電圧 [V], ...」のような形式で
記録されている実測電流値を抽出します。
抽出された電流値の絶対値の平均、最小値、最大値を計算して返します。
電流値として妥当な範囲 (絶対値が 1.0 A 未満) のみを採用し、
誤って温度や磁場などを拾わないようにします。
:param block_rows: Hall測定結果ブロック内の行データ。
:returns: 推定された実測電流の平均値、最小値、最大値を含む辞書。
"""
current_values: List[float] = []
for row in block_rows:
joined = norm("".join(row))
if "端子間ホール起電圧" in joined:
break
# 「電流 [A], 電圧 [V], ... 電流 [A], 電圧 [V], ...」の表で
# 電流はおおむね 0列目と4列目に入る。
for idx in (0, 4):
if len(row) <= idx:
continue
v = parse_float(row[idx])
if v is None:
continue
# 電流値として妥当な範囲だけ採用。温度や磁場などの誤拾いを避ける。
if abs(v) < 1.0:
current_values.append(v)
if not current_values:
return {
"hall_measured_current_A": None,
"hall_measured_current_min_A": None,
"hall_measured_current_max_A": None,
}
return {
"hall_measured_current_A": sum(abs(x) for x in current_values) / len(current_values),
"hall_measured_current_min_A": min(current_values),
"hall_measured_current_max_A": max(current_values),
}
[ドキュメント]
def parse_hall_result(block_rows: List[List[str]]) -> Dict[str, Any]:
"""`<<<AC磁場ホール測定結果>>>` ブロックを解析する。
詳細説明:
Hall測定結果ブロックから、磁場、ホール起電圧、位相、ノイズ、ドリフト率、
方向依存性、キャリアタイプ、Hall係数、キャリア濃度、移動度などの
測定結果を抽出します。
共通情報 (`parse_common_block_items` から得られるもの) も取り込み、
適切なキー名で辞書に格納します。
キャリアタイプに基づいてHall係数の符号を決定し、必要に応じて符号付きに変換します。
パラメータの平均値、最小値、最大値からは半幅誤差と相対誤差も計算されます。
解析中に検出された警告は `parser_warning` に格納されます。
:param block_rows: Hall測定結果ブロック内の行データ。
:returns: 抽出されたHall測定結果を含む辞書。
"""
common = parse_common_block_items(block_rows)
d: Dict[str, Any] = {}
warnings: List[str] = []
if common.get("measurement_no") is not None:
d["measurement_no"] = common["measurement_no"]
d["hall_T_K"] = common.get("T_K")
d["hall_start_time"] = common.get("start_time", "")
d["hall_end_time"] = common.get("end_time", "")
d["hall_set_current_A"] = common.get("set_current_A")
for row in block_rows:
lbl = row_label(row)
if lbl == "磁場":
d["hall_B_T"] = parse_float(row[1] if len(row) > 1 else "")
d.update(estimate_hall_measured_current(block_rows))
in_total = False
carrier_type_raw = ""
for row in block_rows:
lbl = row_label(row)
joined = norm("".join(row))
if "<総合結果>" in joined or "< 総合結果 >" in "".join(row):
in_total = True
continue
if not in_total:
continue
if lbl == "ホール起電圧":
d["hall_voltage_V"] = parse_float(row[1] if len(row) > 1 else "")
elif lbl == "位相":
d["hall_phase_deg"] = parse_float(row[1] if len(row) > 1 else "")
elif lbl == "ノイズ":
unit = parse_unit(row[2] if len(row) > 2 else "")
if "V" in unit.upper():
d["hall_noise_V"] = parse_float(row[1] if len(row) > 1 else "")
elif "%" in unit or "%" in unit:
d["hall_noise_percent"] = parse_float(row[1] if len(row) > 1 else "")
elif lbl == "ドリフト率":
d["hall_drift_percent"] = parse_float(row[1] if len(row) > 1 else "")
elif lbl == "方向依存性":
d["hall_direction_dependence_percent"] = parse_float(row[1] if len(row) > 1 else "")
elif lbl == "キャリアタイプ":
carrier_type_raw = row[1] if len(row) > 1 else ""
d["carrier_type_raw"] = carrier_type_raw
_sign, status = detect_carrier_sign(carrier_type_raw)
d["hall_sign_status"] = status
sign, status = detect_carrier_sign(carrier_type_raw)
if not carrier_type_raw:
d.setdefault("carrier_type_raw", "")
d["hall_sign_status"] = "missing"
warnings.append("carrier_type missing")
elif sign is None:
d["hall_sign_status"] = "unknown"
warnings.append("carrier_type unknown; hall coefficient left as reported")
for row in block_rows:
lbl = normalized_param_label(row_label(row))
for key, (base, unit_suffix, is_signed) in PARAM_MAP.items():
if lbl == key:
avg = parse_float(row[1] if len(row) > 1 else "")
vmin = parse_float(row[2] if len(row) > 2 else "")
vmax = parse_float(row[3] if len(row) > 3 else "")
if avg is None:
warnings.append(f"{base} average missing")
if vmin is None:
warnings.append(f"{base} min missing")
if vmax is None:
warnings.append(f"{base} max missing")
add_param_stats(
d,
base,
unit_suffix,
avg,
vmin,
vmax,
signed=is_signed,
sign=sign,
)
break
d["parser_warning"] = "; ".join(warnings)
return d
[ドキュメント]
def make_quality_flag(record: Dict[str, Any]) -> str:
"""Hallデータの簡易品質フラグを作成する。
詳細説明:
これは最終判断ではなく、Excelで目視確認するための目印です。
以下の基準でフラグを決定します。
- ホールノイズまたは方向依存性が50%以上: "bad"
- `parser_warning` に "missing" が含まれる場合: "warning"
- `parser_warning` に "unknown" が含まれる場合: "warning"
- ホールノイズまたは方向依存性が10%以上: "warning"
- ホールノイズと方向依存性の両方が値がない場合: "warning"
- それ以外の場合: "good"
:param record: 1つの測定レコードを含む辞書。
:returns: 簡易品質フラグ ("good", "warning", "bad")。
"""
warning_text = str(record.get("parser_warning") or "")
noise = record.get("hall_noise_percent")
dep = record.get("hall_direction_dependence_percent")
values = [v for v in (noise, dep) if isinstance(v, (int, float))]
if any(v >= 50 for v in values):
return "bad"
if "missing" in warning_text:
return "warning"
if "unknown" in warning_text:
return "warning"
if any(v >= 10 for v in values):
return "warning"
if not values:
return "warning"
return "good"
[ドキュメント]
def build_records_from_csv(path: Path, encoding: str = "cp932") -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
"""1つのCSVファイルからExcel出力用レコードを作成する。
詳細説明:
指定されたCSVファイル全体を読み込み、ファイル冒頭のグローバル情報と、
各測定ブロック(比抵抗、Hall)の情報を抽出して、
Excelの「primary」シートと「files」シートに書き込むための
レコードのリストを生成します。
同一の `measurement_no` を持つ比抵抗データとHallデータは結合されます。
インポート日時、シートコンダクタンス、品質フラグなども計算・追加されます。
ファイル全体で発生した警告は `global_info` に集約されます。
:param path: 解析対象のCSVファイルのパス。
:param encoding: CSVファイルの文字エンコーディング。デフォルトは "cp932"。
:returns: (primaryデータのレコードリスト, ファイル全体のグローバル情報を含む辞書) のタプル。
"""
rows = read_csv_rows(path, encoding=encoding)
header_rows, blocks = split_blocks(rows)
global_info = parse_global_info(header_rows)
by_no: Dict[int, Dict[str, Any]] = {}
file_warnings: List[str] = []
for title, block_rows in blocks:
nt = norm(title)
if "比抵抗測定結果" in nt and "詳細" not in nt:
d = parse_rho_result(block_rows)
no = d.get("measurement_no")
if no is None:
file_warnings.append("rho block without measurement_no")
continue
by_no.setdefault(int(no), {}).update(d)
elif ("AC磁場ホール測定結果" in nt or "磁場ホール測定結果" in nt) and "詳細" not in nt:
d = parse_hall_result(block_rows)
no = d.get("measurement_no")
if no is None:
file_warnings.append("hall block without measurement_no")
continue
# 既存のparser_warningとマージ
old = by_no.setdefault(int(no), {}).get("parser_warning", "")
new = d.get("parser_warning", "")
if old and new:
d["parser_warning"] = old + "; " + new
elif old:
d["parser_warning"] = old
by_no.setdefault(int(no), {}).update(d)
imported_at = _dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
out: List[Dict[str, Any]] = []
for no in sorted(by_no):
rec: Dict[str, Any] = {}
rec.update(global_info)
rec.update(by_no[no])
rec["source_file"] = str(path)
rec["source_basename"] = path.name
rec["imported_at"] = imported_at
# 厚さ非依存の解析に便利なシートコンダクタンス。
# シートHall係数はユーザ確認用の再計算値になるため出力しない。
Rs = rec.get("sheet_resistance_ohm_sq")
rho = rec.get("rho_ohm_cm")
if isinstance(Rs, (int, float)) and Rs != 0:
rec["sheet_conductance_S_sq"] = 1.0 / Rs
if isinstance(rho, (int, float)) and rho != 0:
rec["sigma_S_cm"] = 1.0 / rho
rec["quality_flag"] = make_quality_flag(rec)
out.append(rec)
global_info = dict(global_info)
global_info["parser_warning"] = "; ".join(file_warnings)
global_info["imported_at"] = imported_at
return out, global_info
[ドキュメント]
def safe_excel_value(v: Any) -> Any:
"""openpyxlに渡せる値に整える。
詳細説明:
Excelのセルに書き込む値として不適切な値(`None`、`NaN`、`Infinity`)を
`None` に変換します。
openpyxlライブラリは `NaN` や `Infinity` を直接書き込むとエラーになるため、
これらの値を安全に処理するためにこの関数を使用します。
:param v: Excelセルに書き込む可能性のある値。
:returns: openpyxlに安全に渡せる値。不適切な場合は None。
"""
if v is None:
return None
if isinstance(v, float) and (math.isnan(v) or math.isinf(v)):
return None
return v
[ドキュメント]
def import_openpyxl():
"""openpyxlライブラリをインポートする。
詳細説明:
openpyxlとその主要なクラス、関数をインポートします。
openpyxlがインストールされていない場合はエラーメッセージを表示し、
`SystemExit` を発生させてプログラムを終了します。
:returns: openpyxlモジュールとその主要なクラス、関数のタプル。
:raises SystemExit: openpyxlがインストールされていない場合。
"""
try:
import openpyxl
from openpyxl import Workbook, load_workbook
from openpyxl.styles import Font, PatternFill, Alignment
from openpyxl.utils import get_column_letter
except ImportError as exc:
print("ERROR: openpyxl が必要です。次のコマンドでインストールしてください:", file=sys.stderr)
print(" pip install openpyxl", file=sys.stderr)
raise SystemExit(2) from exc
return openpyxl, Workbook, load_workbook, Font, PatternFill, Alignment, get_column_letter
[ドキュメント]
def append_records(ws: Any, headers: List[str], records: List[Dict[str, Any]]) -> None:
"""指定されたレコードをExcelシートに追記する。
詳細説明:
各レコード (辞書) を、指定された `headers` の順序に従って
Excelシート `ws` の新しい行として追加します。
`safe_excel_value` 関数を使用して、Excelに安全な値のみが書き込まれるようにします。
:param ws: データを追記するopenpyxlのWorksheetオブジェクト。
:param headers: シートのヘッダー列名のリスト。この順序でレコードが書き込まれる。
:param records: 追記するレコードのリスト。各レコードは辞書形式。
:returns: None
"""
for rec in records:
row = [safe_excel_value(rec.get(h)) for h in headers]
ws.append(row)
[ドキュメント]
def style_sheet(ws: Any) -> None:
"""Excelシートのスタイルを設定する。
詳細説明:
シートのヘッダー行に背景色、太字フォント、中央揃えを適用し、
A2セルでのウィンドウ固定とオートフィルターを設定します。
また、列幅をヘッダーの内容や特定のキーに基づいて自動調整します。
数値を含む列には、データの種類に応じて適切な数値書式(指数表記、整数、小数点以下3桁)
を適用します。
:param ws: スタイルを設定するopenpyxlのWorksheetオブジェクト。
:returns: None
"""
_openpyxl, _Workbook, _load_workbook, Font, PatternFill, Alignment, get_column_letter = import_openpyxl()
header_fill = PatternFill("solid", fgColor="D9EAF7")
header_font = Font(bold=True)
ws.freeze_panes = "A2"
if ws.max_column > 0 and ws.max_row > 1:
ws.auto_filter.ref = ws.dimensions
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
for col_idx in range(1, ws.max_column + 1):
letter = get_column_letter(col_idx)
header = str(ws.cell(row=1, column=col_idx).value or "")
if header in ("source_file", "parser_warning", "comment"):
width = 42
elif header in ("source_basename", "measurement_datetime"):
width = 28
elif "time" in header or "date" in header:
width = 18
elif header in ("sample_name",):
width = 22
else:
width = max(12, min(24, len(header) + 2))
ws.column_dimensions[letter].width = width
# 数値列の書式。幅広い桁を扱うので指数表記を基本にする。
numeric_headers = {
h for h in [ws.cell(row=1, column=c).value for c in range(1, ws.max_column + 1)]
if isinstance(h, str)
and (
h.endswith("_A")
or h.endswith("_V")
or h.endswith("_T")
or h.endswith("_K")
or h.endswith("_cm")
or h.endswith("_cm2")
or h.endswith("_cm3")
or h.endswith("_cm3_C")
or h.endswith("_cm2_Vs")
or h.endswith("_ohm_cm")
or h.endswith("_ohm_sq")
or h.endswith("_S_sq")
or h.endswith("_S_cm")
or h.endswith("_percent")
or h in ("rho_F_value", "measurement_no", "thickness_value")
)
}
header_to_col = {
ws.cell(row=1, column=c).value: c for c in range(1, ws.max_column + 1)
}
for h in numeric_headers:
col = header_to_col.get(h)
if not col:
continue
for row in range(2, ws.max_row + 1):
cell = ws.cell(row=row, column=col)
if isinstance(cell.value, (int, float)):
if h in ("measurement_no",):
cell.number_format = "0"
elif h.endswith("_percent") or h == "rho_F_value":
cell.number_format = "0.000"
else:
cell.number_format = "0.000000E+00"
[ドキュメント]
def append_to_excel(
excel_path: Path,
primary_records: List[Dict[str, Any]],
file_records: List[Dict[str, Any]],
*,
primary_sheet: str = "primary",
files_sheet: str = "files",
write_files_sheet: bool = True,
) -> None:
"""Excelファイルに測定結果とファイル履歴を追記する。
詳細説明:
指定されたExcelファイル `excel_path` に、
`primary_records` を `primary_sheet` に、
`file_records` を `files_sheet` に追記します。
Excelファイルが存在しない場合は新規作成され、
存在する場合は既存のデータに追記されます。
各シートは `ensure_sheet_with_headers` でヘッダーが適切に管理され、
`style_sheet` で書式が設定されます。
:param excel_path: 出力先となるExcelファイルのパス。
:param primary_records: primaryシートに追記する測定レコードのリスト。
:param file_records: filesシートに追記するファイル履歴レコードのリスト。
:param primary_sheet: primaryデータを書き込むシート名。デフォルトは "primary"。
:param files_sheet: ファイル履歴を書き込むシート名。デフォルトは "files"。
:param write_files_sheet: filesシートを作成・追記するかどうかのフラグ。
:returns: None
"""
_openpyxl, Workbook, load_workbook, *_ = import_openpyxl()
if excel_path.exists():
wb = load_workbook(excel_path)
else:
wb = Workbook()
# デフォルトSheetを後で不要なら削除する
if wb.active and wb.active.title == "Sheet":
wb.remove(wb.active)
ws, headers = ensure_sheet_with_headers(wb, primary_sheet, PRIMARY_COLUMNS)
append_records(ws, headers, primary_records)
style_sheet(ws)
if write_files_sheet:
ws_files, file_headers = ensure_sheet_with_headers(wb, files_sheet, FILES_COLUMNS)
append_records(ws_files, file_headers, file_records)
style_sheet(ws_files)
excel_path.parent.mkdir(parents=True, exist_ok=True)
wb.save(excel_path)
[ドキュメント]
def make_file_record(path: Path, global_info: Dict[str, Any], num_measurements: int) -> Dict[str, Any]:
"""1つのCSVファイルに関する履歴レコードを作成する。
詳細説明:
指定されたファイルパス `path` と、そのファイルから抽出された
グローバル情報 `global_info` を基に、ファイル履歴シート (`files_sheet`)
に書き込むためのレコードを作成します。
ファイルサイズ、最終更新時刻、解析時に検出された警告なども含まれます。
:param path: 処理対象のCSVファイルのパス。
:param global_info: CSVファイルから抽出されたグローバル情報を含む辞書。
:param num_measurements: そのファイルから抽出された測定レコードの数。
:returns: ファイル履歴レコードを含む辞書。
"""
stat = path.stat()
rec: Dict[str, Any] = {
"source_file": str(path),
"source_basename": path.name,
"file_size": stat.st_size,
"modified_time": _dt.datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S"),
"num_measurements": num_measurements,
}
for key in (
"imported_at",
"sample_name",
"measurement_date",
"measurement_time",
"measurement_datetime",
"operator",
"comment",
"thickness_value",
"thickness_unit",
"thickness_cm",
"parser_warning",
):
rec[key] = global_info.get(key)
return rec
[ドキュメント]
def print_dry_run_summary(records: List[Dict[str, Any]]) -> None:
"""ドライラン実行時の解析結果の概要を表示する。
詳細説明:
`--dry-run` オプションが指定された際に、Excelに書き込む代わりに
解析された測定レコードの主要な情報を標準出力に表形式で表示します。
これにより、実際の書き込み前にデータの内容を確認できます。
:param records: 解析された測定レコードのリスト。
:returns: None
"""
if not records:
print("No records parsed.")
return
print("Parsed records:")
columns = [
"source_basename",
"measurement_no",
"rho_T_K",
"hall_T_K",
"sheet_resistance_ohm_sq",
"rho_ohm_cm",
"hall_voltage_V",
"carrier_type_raw",
"hall_sign_status",
"hall_coeff_cm3_C",
"carrier_density_cm3",
"sheet_carrier_density_cm2",
"mobility_cm2_Vs",
"quality_flag",
]
print("\t".join(columns))
for rec in records:
vals = []
for col in columns:
v = rec.get(col)
if isinstance(v, float):
vals.append(f"{v:.6E}")
else:
vals.append("" if v is None else str(v))
print("\t".join(vals))
[ドキュメント]
def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace:
"""コマンドライン引数を解析する。
詳細説明:
プログラムの実行時に渡されるコマンドライン引数を解析し、
設定値を含む `argparse.Namespace` オブジェクトを返します。
出力Excelファイル、入力CSVファイル、エンコーディング、再帰検索、
ドライランモード、シート名、verboseモードなどのオプションを定義しています。
:param argv: コマンドライン引数のリスト (テスト目的などでNone以外の値を渡す)。デフォルトはsys.argv[1:]。
:returns: 解析された引数を含む `argparse.Namespace` オブジェクト。
"""
parser = argparse.ArgumentParser(
description="Hall測定CSVから多層モデル解析用primaryデータをExcelに追記する。"
)
parser.add_argument(
"excel_file",
help="出力先Excelファイル。存在しなければ新規作成、存在すれば追記。",
)
parser.add_argument(
"data_files",
nargs="+",
help="入力CSVファイルまたはワイルドカード。例: *.CSV \"subdir\\*.csv\"",
)
parser.add_argument(
"--encoding",
default="cp932",
help="入力CSVの文字コード。デフォルト: cp932",
)
parser.add_argument(
"--recursive",
action="store_true",
help="globで ** を使った再帰検索を有効にする。",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Excelに書き込まず、読み取り結果の概要だけ表示する。",
)
parser.add_argument(
"--primary-sheet",
default="primary",
help="primaryデータを書き込むシート名。デフォルト: primary",
)
parser.add_argument(
"--files-sheet",
default="files",
help="ファイル単位の履歴を書き込むシート名。デフォルト: files",
)
parser.add_argument(
"--no-files-sheet",
action="store_true",
help="filesシートを作成・追記しない。",
)
parser.add_argument(
"--verbose",
action="store_true",
help="ファイルごとの読み取り件数を表示する。",
)
return parser.parse_args(argv)
[ドキュメント]
def main(argv: Optional[Sequence[str]] = None) -> int:
"""プログラムのメインエントリポイント。
詳細説明:
コマンドライン引数を解析し、指定されたCSVファイルを読み込み、
測定データを抽出してExcelファイルに書き込む一連の処理を実行します。
入力ファイルが見つからない場合や、解析できるデータがない場合は
エラーメッセージを出力し、適切な終了コードを返します。
`--dry-run` オプションが指定された場合は、Excelへの書き込みは行わず、
解析結果の概要を表示します。
:param argv: コマンドライン引数のリスト (テスト目的などでNone以外の値を渡す)。デフォルトはsys.argv[1:]。
:returns: 成功時は 0、エラー時は 1 を返す。
"""
args = parse_args(argv)
excel_path = Path(args.excel_file)
input_files = expand_input_patterns(args.data_files, recursive=args.recursive)
if not input_files:
print("ERROR: 入力ファイルが見つかりません。", file=sys.stderr)
for pat in args.data_files:
print(f" pattern: {pat}", file=sys.stderr)
return 1
all_records: List[Dict[str, Any]] = []
file_records: List[Dict[str, Any]] = []
for path in input_files:
try:
records, global_info = build_records_from_csv(path, encoding=args.encoding)
except Exception as exc:
print(f"ERROR: failed to parse {path}: {exc}", file=sys.stderr)
if args.verbose:
import traceback
traceback.print_exc()
continue
all_records.extend(records)
file_records.append(make_file_record(path, global_info, len(records)))
if args.verbose:
print(f"{path}: {len(records)} measurement rows")
if args.dry_run:
print_dry_run_summary(all_records)
print(f"\nTotal files : {len(input_files)}")
print(f"Total records : {len(all_records)}")
return 0
if not all_records:
print("ERROR: 追記できる測定データがありません。", file=sys.stderr)
return 1
append_to_excel(
excel_path,
all_records,
file_records,
primary_sheet=args.primary_sheet,
files_sheet=args.files_sheet,
write_files_sheet=not args.no_files_sheet,
)
print(f"Excel written : {excel_path}")
print(f"Files parsed : {len(input_files)}")
print(f"Rows appended : {len(all_records)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())