electrical.collect_resitest8300 のソースコード

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Hall測定装置のCSVファイルから、多層モデル解析に使いやすい primary データをExcelに追記するプログラム。

:doc:`collect_resitest8300_usage`

このプログラムは、Resitest 8300型などのHall測定結果CSVファイルから、
比抵抗、Hall係数、キャリア濃度、移動度などのデータを抽出し、
整形された「primary」データとしてExcelシートに追記します。
また、処理したファイルに関する情報も別のシートに記録します。

想定している入力:
  - 8310型などの Hall 測定結果CSV
  - cp932 / Shift_JIS 系の日本語CSV
  - 1ファイル中に複数温度・複数測定No.のブロックが入っている形式

基本方針:
  - 第一引数: 出力先 Excel ファイル
  - 第二引数以降: CSVファイルまたはワイルドカード
  - Excel が存在しなければ新規作成
  - Excel が存在すれば単純追記
  - 重複チェック・置換はしない
  - Hall係数は carrier type が N/P と判定できる場合のみ符号付きに変換
  - シートHall係数の再計算値は出力しない
  - 装置CSV中の平均値・最小・最大から半幅誤差と相対誤差を出力

使い方:
    python collect_hall_primary.py Hall_summary.xlsx "*.CSV"

例:
    python collect_hall_primary.py Hall_summary.xlsx "D:\\data\\*.CSV"
    python collect_hall_primary.py Hall_summary.xlsx "*.CSV" "subdir\\*.csv"
    python collect_hall_primary.py Hall_summary.xlsx "**\\*.CSV" --recursive
    python collect_hall_primary.py Hall_summary.xlsx "*.CSV" --dry-run

必要ライブラリ:
    pip install openpyxl
"""

from __future__ import annotations

import argparse
import csv
import datetime as _dt
import glob
import math
import re
import sys
import unicodedata
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple


FLOAT_RE = re.compile(
    r"[-+]?(?:(?:\d+\.\d*)|(?:\.\d+)|(?:\d+))(?:[Ee][+-]?\d+)?"
)


PRIMARY_COLUMNS = [
    "source_file",
    "source_basename",
    "imported_at",
    "sample_name",
    "measurement_date",
    "measurement_time",
    "measurement_datetime",
    "operator",
    "comment",
    "measurement_no",
    "thickness_value",
    "thickness_unit",
    "thickness_cm",
    "rho_T_K",
    "hall_T_K",
    "rho_start_time",
    "rho_end_time",
    "hall_start_time",
    "hall_end_time",
    "rho_set_current_A",
    "hall_set_current_A",
    "hall_measured_current_A",
    "hall_measured_current_min_A",
    "hall_measured_current_max_A",
    "hall_B_T",
    "sheet_resistance_ohm_sq",
    "sheet_conductance_S_sq",
    "rho_ohm_cm",
    "sigma_S_cm",
    "hall_voltage_V",
    "hall_phase_deg",
    "hall_noise_V",
    "hall_noise_percent",
    "hall_drift_percent",
    "hall_direction_dependence_percent",
    "carrier_type_raw",
    "hall_sign_status",
    "hall_coeff_cm3_C",
    "hall_coeff_min_cm3_C",
    "hall_coeff_max_cm3_C",
    "hall_coeff_err_cm3_C",
    "hall_coeff_err_percent",
    "carrier_density_cm3",
    "carrier_density_min_cm3",
    "carrier_density_max_cm3",
    "carrier_density_err_cm3",
    "carrier_density_err_percent",
    "sheet_carrier_density_cm2",
    "sheet_carrier_density_min_cm2",
    "sheet_carrier_density_max_cm2",
    "sheet_carrier_density_err_cm2",
    "sheet_carrier_density_err_percent",
    "mobility_cm2_Vs",
    "mobility_min_cm2_Vs",
    "mobility_max_cm2_Vs",
    "mobility_err_cm2_Vs",
    "mobility_err_percent",
    "rho_noise_percent",
    "rho_F_value",
    "quality_flag",
    "parser_warning",
]

FILES_COLUMNS = [
    "source_file",
    "source_basename",
    "file_size",
    "modified_time",
    "imported_at",
    "sample_name",
    "measurement_date",
    "measurement_time",
    "measurement_datetime",
    "operator",
    "comment",
    "thickness_value",
    "thickness_unit",
    "thickness_cm",
    "num_measurements",
    "parser_warning",
]


# CSV中のパラメータ名 -> (列名ベース, 単位サフィックス, Hall係数として符号変換するか)
PARAM_MAP = {
    "ホール係数": ("hall_coeff", "cm3_C", True),
    "キャリア濃度": ("carrier_density", "cm3", False),
    "シートキャリア濃度": ("sheet_carrier_density", "cm2", False),
    "移動度": ("mobility", "cm2_Vs", False),
}


[ドキュメント] def norm(s: Any) -> str: """全角英数字や半角カナを比較しやすい形に正規化する。 詳細説明: `unicodedata.normalize("NFKC", ...)` を使用して、 互換等価性を持つ文字を正規化し、前後の空白を除去します。 これにより、例えば「ABC」や「アイウ」のような文字が 「ABC」や「アイウ」に変換され、比較やパースが容易になります。 :param s: 正規化する任意の入力値。 :returns: 正規化された文字列。 """ return unicodedata.normalize("NFKC", str(s or "")).strip()
[ドキュメント] def clean_cell(s: Any) -> str: """CSVセルのNUL文字と前後空白を除去する。 詳細説明: CSVファイルによっては、内部にNUL文字 (\\x00) が含まれていることがあり、 これを削除することで文字列処理の問題を回避します。 また、前後の空白も `strip()` メソッドで除去します。 入力が `None` の場合は空文字列を返します。 :param s: CSVセルから読み取られた値。 :returns: クリーンアップされた文字列。 """ if s is None: return "" return str(s).replace("\x00", "").strip()
[ドキュメント] def parse_float(s: Any) -> Optional[float]: """セル文字列から最初の数値を取り出す。 詳細説明: 装置CSVでは `* 1.23E+4`, `# -3.0E-2` のような警告記号付き数値が 出力されることがあります。この関数は、単純な `float()` 変換ではなく、 正規表現 `FLOAT_RE` を用いて文字列中の最初の数値部分を抽出し、 それを浮動小数点数として解析します。 また、カンマ(`,`)や全角ハイフン(`−`)も除去してパースを試みます。 :param s: 数値を含む可能性のある文字列または任意のオブジェクト。 :returns: 抽出された浮動小数点数、または数値が見つからない場合は None。 """ t = norm(str(s or "").replace("\x00", "")) t = t.replace(",", "") t = t.replace("−", "-") m = FLOAT_RE.search(t) if not m: return None try: return float(m.group(0)) except ValueError: return None
[ドキュメント] def parse_unit(s: Any) -> str: """[ mm ] のような単位表記を mm に整形する。 詳細説明: 入力された単位文字列から、角括弧 `[]` や全角の `[]` を除去し、 さらにマイクロ記号 `μ` や `µ` を `um` に統一して、 単位を比較しやすい標準的な形式に整形します。 :param s: 単位を表す文字列。 :returns: 整形された単位文字列。 """ u = norm(s) for ch in "[][]": u = u.replace(ch, "") u = u.replace("μ", "um").replace("µ", "um") return u.strip()
[ドキュメント] def thickness_to_cm(value: Optional[float], unit: str) -> Optional[float]: """厚さを cm に換算する。未知単位なら None。 詳細説明: 入力された厚さの値と単位から、その厚さをセンチメートル (cm) に換算します。 以下の単位に対応しています。 - cm: 1.0 - mm: 1.0e-1 - um (μm): 1.0e-4 - nm: 1.0e-7 - m: 100.0 - A, Å, ang, angstrom: 1.0e-8 対応していない単位が指定された場合、または `value` が None の場合は None を返します。 :param value: 厚さの数値。 :param unit: 厚さの単位を表す文字列。 :returns: cm に換算された厚さ、または換算できない場合は None。 """ if value is None: return None u = norm(unit).lower().replace("μ", "u").replace("µ", "u") u = u.replace(" ", "") mapping = { "cm": 1.0, "mm": 1.0e-1, "um": 1.0e-4, "nm": 1.0e-7, "m": 100.0, "a": 1.0e-8, "å": 1.0e-8, "ang": 1.0e-8, "angstrom": 1.0e-8, } factor = mapping.get(u) if factor is None: return None return value * factor
[ドキュメント] def read_csv_rows(path: Path, encoding: str = "cp932") -> List[List[str]]: """CSVを読み込み、セル配列として返す。 詳細説明: 指定されたパスのCSVファイルを、指定されたエンコーディングで読み込みます。 読み込み中に発生するデコードエラーは `replace` モードで処理され、 NUL文字 (\\x00) は除去されます。 各セルは `clean_cell` 関数で整形され、文字列のリストのリストとして返されます。 :param path: 読み込むCSVファイルのパス。 :param encoding: CSVファイルの文字エンコーディング。デフォルトは "cp932"。 :returns: CSVの内容を表す文字列のリストのリスト。 """ raw = path.read_bytes() text = raw.decode(encoding, errors="replace").replace("\x00", "") return [[clean_cell(c) for c in row] for row in csv.reader(text.splitlines())]
[ドキュメント] def first_nonempty(row: Sequence[str]) -> str: """行の最初の空でないセルを返す。 詳細説明: 指定された行 (文字列のシーケンス) を先頭から順に走査し、 最初に見つかった空でない文字列を返します。 行が空であるか、すべてのセルが空文字列である場合は空文字列を返します。 :param row: 文字列のシーケンス(CSVの1行に相当)。 :returns: 行の最初の空でないセル、または空文字列。 """ for c in row: if c: return c return ""
[ドキュメント] def row_label(row: Sequence[str]) -> str: """行のラベル(最初のセル)を正規化して返す。 詳細説明: CSVの行の最初のセルをラベルとみなし、`norm` 関数で正規化します。 これにより、ラベルの比較やキーとしての使用が容易になります。 行が空の場合は空文字列を返します。 :param row: 文字列のシーケンス(CSVの1行に相当)。 :returns: 正規化された行ラベル、または空文字列。 """ return norm(row[0]) if row else ""
[ドキュメント] def split_blocks(rows: List[List[str]]) -> Tuple[List[List[str]], List[Tuple[str, List[List[str]]]]]: """CSVの行データを `<<<...>>>` ブロックに分割する。 詳細説明: Hall測定装置のCSVファイルは、ファイル冒頭の共通情報と、 `<<<比抵抗測定結果>>>` や `<<<AC磁場ホール測定結果>>>` のような ブロックで構成されています。この関数は、これらのブロックマーカーを 識別し、CSVデータをヘッダー行とタイトル付きのブロックに分割します。 最初の `<<<...>>>` が現れるまでの行はヘッダー行として扱われます。 :param rows: CSVファイルから読み込まれたすべての行データ。 :returns: ヘッダー行のリストと、(ブロックタイトル, ブロック内の行データ) のタプルのリスト。 """ header_rows: List[List[str]] = [] blocks: List[Tuple[str, List[List[str]]]] = [] current_title: Optional[str] = None current_rows: List[List[str]] = [] for row in rows: first = first_nonempty(row) nfirst = norm(first) if nfirst.startswith("<<<") and nfirst.endswith(">>>"): if current_title is None: header_rows.extend(current_rows) else: blocks.append((current_title, current_rows)) current_title = first current_rows = [] else: current_rows.append(row) if current_title is None: header_rows.extend(current_rows) else: blocks.append((current_title, current_rows)) return header_rows, blocks
[ドキュメント] def parse_global_info(header_rows: List[List[str]]) -> Dict[str, Any]: """ファイル冒頭の共通情報を読む。 詳細説明: CSVファイルのヘッダー部分から、試料名、測定年月日、測定者、コメント、 試料厚みなどのグローバルな情報を抽出します。 試料厚みについては、値と単位をパースし、センチメートルへの換算も行います。 抽出された情報は辞書として返されます。 :param header_rows: ファイル冒頭のヘッダー行のリスト。 :returns: 抽出されたグローバル情報を含む辞書。 """ info: Dict[str, Any] = {} for row in header_rows: lbl = row_label(row) if "試料名" in lbl: info["sample_name"] = row[1] if len(row) > 1 else "" elif "測定年月日" in lbl: date = row[1] if len(row) > 1 else "" time = row[2] if len(row) > 2 else "" info["measurement_date"] = date info["measurement_time"] = time info["measurement_datetime"] = (date + " " + time).strip() elif "測定者" in lbl: info["operator"] = row[1] if len(row) > 1 else "" elif "コメント" in lbl: info["comment"] = row[1] if len(row) > 1 else "" elif "試料厚み" in lbl or "試料厚さ" in lbl: value = parse_float(row[1] if len(row) > 1 else "") unit = parse_unit(row[2] if len(row) > 2 else "") info["thickness_value"] = value info["thickness_unit"] = unit info["thickness_cm"] = thickness_to_cm(value, unit) return info
[ドキュメント] def parse_common_block_items(block_rows: List[List[str]]) -> Dict[str, Any]: """比抵抗・Hallの各ブロック冒頭に共通する情報を読む。 詳細説明: 比抵抗測定結果ブロックやホール測定結果ブロックの冒頭に共通して現れる、 測定No、測定温度、測定開始/終了時刻、設定電流などの情報を抽出します。 これらの情報は、各測定レコードの基本情報として使用されます。 :param block_rows: 特定の測定ブロック内の行データ。 :returns: 抽出された共通情報を含む辞書。 """ d: Dict[str, Any] = {} for row in block_rows: lbl = row_label(row) if lbl.startswith("測定No") or "測定No" in lbl: v = parse_float(row[1] if len(row) > 1 else "") d["measurement_no"] = int(v) if v is not None else None elif lbl == "測定温度": d["T_K"] = parse_float(row[1] if len(row) > 1 else "") elif lbl == "測定開始時刻": d["start_time"] = row[1] if len(row) > 1 else "" elif lbl == "測定終了時刻": d["end_time"] = row[1] if len(row) > 1 else "" elif lbl == "設定電流": d["set_current_A"] = parse_float(row[1] if len(row) > 1 else "") return d
[ドキュメント] def parse_rho_result(block_rows: List[List[str]]) -> Dict[str, Any]: """`<<<比抵抗測定結果>>>` ブロックを解析する。 詳細説明: 比抵抗測定結果ブロックから、F値、比抵抗値、シート抵抗値、ノイズ などの測定結果を抽出します。 共通情報 (`parse_common_block_items` から得られるもの) も取り込み、 適切なキー名で辞書に格納します。 :param block_rows: 比抵抗測定結果ブロック内の行データ。 :returns: 抽出された比抵抗測定結果を含む辞書。 """ common = parse_common_block_items(block_rows) d: Dict[str, Any] = {} if common.get("measurement_no") is not None: d["measurement_no"] = common["measurement_no"] d["rho_T_K"] = common.get("T_K") d["rho_start_time"] = common.get("start_time", "") d["rho_end_time"] = common.get("end_time", "") d["rho_set_current_A"] = common.get("set_current_A") for row in block_rows: lbl = row_label(row) if lbl == "F値": d["rho_F_value"] = parse_float(row[1] if len(row) > 1 else "") elif lbl == "比抵抗値": d["rho_ohm_cm"] = parse_float(row[1] if len(row) > 1 else "") elif lbl == "シート抵抗値": d["sheet_resistance_ohm_sq"] = parse_float(row[1] if len(row) > 1 else "") elif lbl == "ノイズ": unit = parse_unit(row[2] if len(row) > 2 else "") if "%" in unit or "%" in unit: d["rho_noise_percent"] = parse_float(row[1] if len(row) > 1 else "") return d
[ドキュメント] def detect_carrier_sign(carrier_type_raw: str) -> Tuple[Optional[int], str]: """キャリアタイプからHall係数の符号を決定する。 詳細説明: 入力されたキャリアタイプ文字列を解析し、Hall係数の符号を決定します。 - "N" を含む場合は N型と判断し、符号 `-1` を返します。 - "P" を含む場合は P型と判断し、符号 `+1` を返します。 - 上記以外の場合は符号を決定できないため `None` を返します。 また、キャリアタイプの状態を示す文字列 ("N", "P", "unknown") も返します。 :param carrier_type_raw: CSVから読み取られたキャリアタイプ文字列。 :returns: Hall係数の符号 (1, -1, または None) と、キャリアタイプの状態を示す文字列のタプル。 """ t = norm(carrier_type_raw).upper() if "N" in t: return -1, "N" if "P" in t: return +1, "P" return None, "unknown"
[ドキュメント] def normalized_param_label(label: str) -> str: """パラメータラベルを正規化する。 詳細説明: 入力されたパラメータラベルを `norm` 関数で正規化し、さらに空白を除去します。 これにより、CSV内の様々な表記揺れに対応し、ラベルの一貫した比較を可能にします。 :param label: 正規化するパラメータラベル文字列。 :returns: 正規化・空白除去されたパラメータラベル文字列。 """ return norm(label).replace(" ", "")
[ドキュメント] def add_param_stats( d: Dict[str, Any], base: str, unit_suffix: str, avg: Optional[float], vmin: Optional[float], vmax: Optional[float], *, signed: bool = False, sign: Optional[int] = None, ) -> None: """平均・最小・最大から半幅誤差と相対誤差を計算し辞書に追加する。 詳細説明: 測定値の平均値 (avg)、最小値 (vmin)、最大値 (vmax) を基に、 半幅誤差 (err) と相対誤差 (err_percent) を計算し、 これらを指定されたベース名と単位サフィックスを用いて辞書 `d` に追加します。 Hall係数 (signed=True) の場合、装置出力が絶対値で出ていると仮定し、 `sign` 引数 (N型:-1, P型:+1) に応じて符号を付与します。 N型の場合、min/maxは数直線上で min=-max_abs, max=-min_abs として保存されます。 符号変換が必要な場合、`sign` が `None` でない場合に適用されます。 :param d: 結果を追加する辞書。 :param base: パラメータのベース名 (例: "hall_coeff")。 :param unit_suffix: 単位のサフィックス (例: "cm3_C")。 :param avg: パラメータの平均値。 :param vmin: パラメータの最小値。 :param vmax: パラメータの最大値。 :param signed: Hall係数などのように符号付きで扱うべきかどうかのフラグ。 :param sign: `signed` が True の場合に適用する符号 (1 または -1)。 :returns: None """ if signed and sign is not None: if avg is not None: avg = sign * abs(avg) if vmin is not None and vmax is not None: a = abs(vmin) b = abs(vmax) if sign < 0: vmin, vmax = -max(a, b), -min(a, b) else: vmin, vmax = min(a, b), max(a, b) else: if vmin is not None: vmin = sign * abs(vmin) if vmax is not None: vmax = sign * abs(vmax) d[f"{base}_{unit_suffix}"] = avg d[f"{base}_min_{unit_suffix}"] = vmin d[f"{base}_max_{unit_suffix}"] = vmax if vmin is not None and vmax is not None: err = abs(vmax - vmin) / 2.0 else: err = None d[f"{base}_err_{unit_suffix}"] = err if err is not None and avg not in (None, 0): d[f"{base}_err_percent"] = abs(err / avg) * 100.0 else: d[f"{base}_err_percent"] = None
[ドキュメント] def estimate_hall_measured_current(block_rows: List[List[str]]) -> Dict[str, Optional[float]]: """Hall測定結果ブロック内の端子間テーブルから実測電流の代表値を推定する。 詳細説明: Hall測定結果ブロック内の「端子間ホール起電圧」のテーブルより前の行で、 「電流 [A], 電圧 [V], ... 電流 [A], 電圧 [V], ...」のような形式で 記録されている実測電流値を抽出します。 抽出された電流値の絶対値の平均、最小値、最大値を計算して返します。 電流値として妥当な範囲 (絶対値が 1.0 A 未満) のみを採用し、 誤って温度や磁場などを拾わないようにします。 :param block_rows: Hall測定結果ブロック内の行データ。 :returns: 推定された実測電流の平均値、最小値、最大値を含む辞書。 """ current_values: List[float] = [] for row in block_rows: joined = norm("".join(row)) if "端子間ホール起電圧" in joined: break # 「電流 [A], 電圧 [V], ... 電流 [A], 電圧 [V], ...」の表で # 電流はおおむね 0列目と4列目に入る。 for idx in (0, 4): if len(row) <= idx: continue v = parse_float(row[idx]) if v is None: continue # 電流値として妥当な範囲だけ採用。温度や磁場などの誤拾いを避ける。 if abs(v) < 1.0: current_values.append(v) if not current_values: return { "hall_measured_current_A": None, "hall_measured_current_min_A": None, "hall_measured_current_max_A": None, } return { "hall_measured_current_A": sum(abs(x) for x in current_values) / len(current_values), "hall_measured_current_min_A": min(current_values), "hall_measured_current_max_A": max(current_values), }
[ドキュメント] def parse_hall_result(block_rows: List[List[str]]) -> Dict[str, Any]: """`<<<AC磁場ホール測定結果>>>` ブロックを解析する。 詳細説明: Hall測定結果ブロックから、磁場、ホール起電圧、位相、ノイズ、ドリフト率、 方向依存性、キャリアタイプ、Hall係数、キャリア濃度、移動度などの 測定結果を抽出します。 共通情報 (`parse_common_block_items` から得られるもの) も取り込み、 適切なキー名で辞書に格納します。 キャリアタイプに基づいてHall係数の符号を決定し、必要に応じて符号付きに変換します。 パラメータの平均値、最小値、最大値からは半幅誤差と相対誤差も計算されます。 解析中に検出された警告は `parser_warning` に格納されます。 :param block_rows: Hall測定結果ブロック内の行データ。 :returns: 抽出されたHall測定結果を含む辞書。 """ common = parse_common_block_items(block_rows) d: Dict[str, Any] = {} warnings: List[str] = [] if common.get("measurement_no") is not None: d["measurement_no"] = common["measurement_no"] d["hall_T_K"] = common.get("T_K") d["hall_start_time"] = common.get("start_time", "") d["hall_end_time"] = common.get("end_time", "") d["hall_set_current_A"] = common.get("set_current_A") for row in block_rows: lbl = row_label(row) if lbl == "磁場": d["hall_B_T"] = parse_float(row[1] if len(row) > 1 else "") d.update(estimate_hall_measured_current(block_rows)) in_total = False carrier_type_raw = "" for row in block_rows: lbl = row_label(row) joined = norm("".join(row)) if "<総合結果>" in joined or "< 総合結果 >" in "".join(row): in_total = True continue if not in_total: continue if lbl == "ホール起電圧": d["hall_voltage_V"] = parse_float(row[1] if len(row) > 1 else "") elif lbl == "位相": d["hall_phase_deg"] = parse_float(row[1] if len(row) > 1 else "") elif lbl == "ノイズ": unit = parse_unit(row[2] if len(row) > 2 else "") if "V" in unit.upper(): d["hall_noise_V"] = parse_float(row[1] if len(row) > 1 else "") elif "%" in unit or "%" in unit: d["hall_noise_percent"] = parse_float(row[1] if len(row) > 1 else "") elif lbl == "ドリフト率": d["hall_drift_percent"] = parse_float(row[1] if len(row) > 1 else "") elif lbl == "方向依存性": d["hall_direction_dependence_percent"] = parse_float(row[1] if len(row) > 1 else "") elif lbl == "キャリアタイプ": carrier_type_raw = row[1] if len(row) > 1 else "" d["carrier_type_raw"] = carrier_type_raw _sign, status = detect_carrier_sign(carrier_type_raw) d["hall_sign_status"] = status sign, status = detect_carrier_sign(carrier_type_raw) if not carrier_type_raw: d.setdefault("carrier_type_raw", "") d["hall_sign_status"] = "missing" warnings.append("carrier_type missing") elif sign is None: d["hall_sign_status"] = "unknown" warnings.append("carrier_type unknown; hall coefficient left as reported") for row in block_rows: lbl = normalized_param_label(row_label(row)) for key, (base, unit_suffix, is_signed) in PARAM_MAP.items(): if lbl == key: avg = parse_float(row[1] if len(row) > 1 else "") vmin = parse_float(row[2] if len(row) > 2 else "") vmax = parse_float(row[3] if len(row) > 3 else "") if avg is None: warnings.append(f"{base} average missing") if vmin is None: warnings.append(f"{base} min missing") if vmax is None: warnings.append(f"{base} max missing") add_param_stats( d, base, unit_suffix, avg, vmin, vmax, signed=is_signed, sign=sign, ) break d["parser_warning"] = "; ".join(warnings) return d
[ドキュメント] def make_quality_flag(record: Dict[str, Any]) -> str: """Hallデータの簡易品質フラグを作成する。 詳細説明: これは最終判断ではなく、Excelで目視確認するための目印です。 以下の基準でフラグを決定します。 - ホールノイズまたは方向依存性が50%以上: "bad" - `parser_warning` に "missing" が含まれる場合: "warning" - `parser_warning` に "unknown" が含まれる場合: "warning" - ホールノイズまたは方向依存性が10%以上: "warning" - ホールノイズと方向依存性の両方が値がない場合: "warning" - それ以外の場合: "good" :param record: 1つの測定レコードを含む辞書。 :returns: 簡易品質フラグ ("good", "warning", "bad")。 """ warning_text = str(record.get("parser_warning") or "") noise = record.get("hall_noise_percent") dep = record.get("hall_direction_dependence_percent") values = [v for v in (noise, dep) if isinstance(v, (int, float))] if any(v >= 50 for v in values): return "bad" if "missing" in warning_text: return "warning" if "unknown" in warning_text: return "warning" if any(v >= 10 for v in values): return "warning" if not values: return "warning" return "good"
[ドキュメント] def build_records_from_csv(path: Path, encoding: str = "cp932") -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: """1つのCSVファイルからExcel出力用レコードを作成する。 詳細説明: 指定されたCSVファイル全体を読み込み、ファイル冒頭のグローバル情報と、 各測定ブロック(比抵抗、Hall)の情報を抽出して、 Excelの「primary」シートと「files」シートに書き込むための レコードのリストを生成します。 同一の `measurement_no` を持つ比抵抗データとHallデータは結合されます。 インポート日時、シートコンダクタンス、品質フラグなども計算・追加されます。 ファイル全体で発生した警告は `global_info` に集約されます。 :param path: 解析対象のCSVファイルのパス。 :param encoding: CSVファイルの文字エンコーディング。デフォルトは "cp932"。 :returns: (primaryデータのレコードリスト, ファイル全体のグローバル情報を含む辞書) のタプル。 """ rows = read_csv_rows(path, encoding=encoding) header_rows, blocks = split_blocks(rows) global_info = parse_global_info(header_rows) by_no: Dict[int, Dict[str, Any]] = {} file_warnings: List[str] = [] for title, block_rows in blocks: nt = norm(title) if "比抵抗測定結果" in nt and "詳細" not in nt: d = parse_rho_result(block_rows) no = d.get("measurement_no") if no is None: file_warnings.append("rho block without measurement_no") continue by_no.setdefault(int(no), {}).update(d) elif ("AC磁場ホール測定結果" in nt or "磁場ホール測定結果" in nt) and "詳細" not in nt: d = parse_hall_result(block_rows) no = d.get("measurement_no") if no is None: file_warnings.append("hall block without measurement_no") continue # 既存のparser_warningとマージ old = by_no.setdefault(int(no), {}).get("parser_warning", "") new = d.get("parser_warning", "") if old and new: d["parser_warning"] = old + "; " + new elif old: d["parser_warning"] = old by_no.setdefault(int(no), {}).update(d) imported_at = _dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S") out: List[Dict[str, Any]] = [] for no in sorted(by_no): rec: Dict[str, Any] = {} rec.update(global_info) rec.update(by_no[no]) rec["source_file"] = str(path) rec["source_basename"] = path.name rec["imported_at"] = imported_at # 厚さ非依存の解析に便利なシートコンダクタンス。 # シートHall係数はユーザ確認用の再計算値になるため出力しない。 Rs = rec.get("sheet_resistance_ohm_sq") rho = rec.get("rho_ohm_cm") if isinstance(Rs, (int, float)) and Rs != 0: rec["sheet_conductance_S_sq"] = 1.0 / Rs if isinstance(rho, (int, float)) and rho != 0: rec["sigma_S_cm"] = 1.0 / rho rec["quality_flag"] = make_quality_flag(rec) out.append(rec) global_info = dict(global_info) global_info["parser_warning"] = "; ".join(file_warnings) global_info["imported_at"] = imported_at return out, global_info
[ドキュメント] def expand_input_patterns(patterns: Sequence[str], recursive: bool = False) -> List[Path]: """WindowsでもPython側でワイルドカード展開する。 詳細説明: コマンドライン引数で渡されたファイルパスのパターン(ワイルドカードを含む) を展開し、実際に存在するファイルパスのリストを返します。 Windows環境ではシェルがワイルドカードを展開しないため、 `glob.glob` を使用してPython側で展開を行います。 `recursive` フラグが True の場合、`**` を使用した再帰的な検索を有効にします。 重複するファイルは `set` を使用して排除されます。 :param patterns: ワイルドカードを含む可能性のあるファイルパスのリスト。 :param recursive: `**` を使用した再帰検索を有効にするかどうかのフラグ。 :returns: 展開された、存在するファイルパスの `Path` オブジェクトのリスト。 """ files: List[Path] = [] seen = set() for pat in patterns: p = Path(pat) if p.exists() and p.is_file(): candidates = [str(p)] else: candidates = glob.glob(pat, recursive=recursive) for c in candidates: cp = Path(c) if not cp.is_file(): continue key = str(cp.resolve()) if key not in seen: seen.add(key) files.append(cp) return files
[ドキュメント] def safe_excel_value(v: Any) -> Any: """openpyxlに渡せる値に整える。 詳細説明: Excelのセルに書き込む値として不適切な値(`None`、`NaN`、`Infinity`)を `None` に変換します。 openpyxlライブラリは `NaN` や `Infinity` を直接書き込むとエラーになるため、 これらの値を安全に処理するためにこの関数を使用します。 :param v: Excelセルに書き込む可能性のある値。 :returns: openpyxlに安全に渡せる値。不適切な場合は None。 """ if v is None: return None if isinstance(v, float) and (math.isnan(v) or math.isinf(v)): return None return v
[ドキュメント] def import_openpyxl(): """openpyxlライブラリをインポートする。 詳細説明: openpyxlとその主要なクラス、関数をインポートします。 openpyxlがインストールされていない場合はエラーメッセージを表示し、 `SystemExit` を発生させてプログラムを終了します。 :returns: openpyxlモジュールとその主要なクラス、関数のタプル。 :raises SystemExit: openpyxlがインストールされていない場合。 """ try: import openpyxl from openpyxl import Workbook, load_workbook from openpyxl.styles import Font, PatternFill, Alignment from openpyxl.utils import get_column_letter except ImportError as exc: print("ERROR: openpyxl が必要です。次のコマンドでインストールしてください:", file=sys.stderr) print(" pip install openpyxl", file=sys.stderr) raise SystemExit(2) from exc return openpyxl, Workbook, load_workbook, Font, PatternFill, Alignment, get_column_letter
[ドキュメント] def ensure_sheet_with_headers(wb: Any, sheet_name: str, columns: List[str]) -> Tuple[Any, List[str]]: """Excelシートを用意し、既存ヘッダに不足列を追加する。 詳細説明: 指定されたワークブック `wb` 内に `sheet_name` のシートが存在しない場合は新規作成します。 シートが存在する場合、または新規作成された場合、最初の行をヘッダー行として扱います。 指定された `columns` のリストに存在するが、既存ヘッダーにはない列がある場合、 その列をヘッダーの最後に追加します。 これにより、新しいデータに新しい列が含まれていても、Excelシートのヘッダーが適切に更新されます。 :param wb: openpyxlのWorkbookオブジェクト。 :param sheet_name: 操作対象のシート名。 :param columns: シートに含めるべき列名のリスト(出力カラム順の定義)。 :returns: (操作対象のWorksheetオブジェクト, 更新後のヘッダー列名のリスト) のタプル。 """ if sheet_name in wb.sheetnames: ws = wb[sheet_name] else: ws = wb.create_sheet(sheet_name) if ws.max_row < 1 or all(ws.cell(row=1, column=c).value in (None, "") for c in range(1, ws.max_column + 1)): # シートが完全に空かヘッダー行が空の場合、指定されたカラムでヘッダーを作成 headers = list(columns) for i, h in enumerate(headers, start=1): ws.cell(row=1, column=i).value = h return ws, headers # 既存のヘッダーを読み込む headers = [ws.cell(row=1, column=c).value for c in range(1, ws.max_column + 1)] headers = [str(h) for h in headers if h not in (None, "")] # 不足しているカラムを追加 for col in columns: if col not in headers: headers.append(col) ws.cell(row=1, column=len(headers)).value = col return ws, headers
[ドキュメント] def append_records(ws: Any, headers: List[str], records: List[Dict[str, Any]]) -> None: """指定されたレコードをExcelシートに追記する。 詳細説明: 各レコード (辞書) を、指定された `headers` の順序に従って Excelシート `ws` の新しい行として追加します。 `safe_excel_value` 関数を使用して、Excelに安全な値のみが書き込まれるようにします。 :param ws: データを追記するopenpyxlのWorksheetオブジェクト。 :param headers: シートのヘッダー列名のリスト。この順序でレコードが書き込まれる。 :param records: 追記するレコードのリスト。各レコードは辞書形式。 :returns: None """ for rec in records: row = [safe_excel_value(rec.get(h)) for h in headers] ws.append(row)
[ドキュメント] def style_sheet(ws: Any) -> None: """Excelシートのスタイルを設定する。 詳細説明: シートのヘッダー行に背景色、太字フォント、中央揃えを適用し、 A2セルでのウィンドウ固定とオートフィルターを設定します。 また、列幅をヘッダーの内容や特定のキーに基づいて自動調整します。 数値を含む列には、データの種類に応じて適切な数値書式(指数表記、整数、小数点以下3桁) を適用します。 :param ws: スタイルを設定するopenpyxlのWorksheetオブジェクト。 :returns: None """ _openpyxl, _Workbook, _load_workbook, Font, PatternFill, Alignment, get_column_letter = import_openpyxl() header_fill = PatternFill("solid", fgColor="D9EAF7") header_font = Font(bold=True) ws.freeze_panes = "A2" if ws.max_column > 0 and ws.max_row > 1: ws.auto_filter.ref = ws.dimensions for cell in ws[1]: cell.fill = header_fill cell.font = header_font cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True) for col_idx in range(1, ws.max_column + 1): letter = get_column_letter(col_idx) header = str(ws.cell(row=1, column=col_idx).value or "") if header in ("source_file", "parser_warning", "comment"): width = 42 elif header in ("source_basename", "measurement_datetime"): width = 28 elif "time" in header or "date" in header: width = 18 elif header in ("sample_name",): width = 22 else: width = max(12, min(24, len(header) + 2)) ws.column_dimensions[letter].width = width # 数値列の書式。幅広い桁を扱うので指数表記を基本にする。 numeric_headers = { h for h in [ws.cell(row=1, column=c).value for c in range(1, ws.max_column + 1)] if isinstance(h, str) and ( h.endswith("_A") or h.endswith("_V") or h.endswith("_T") or h.endswith("_K") or h.endswith("_cm") or h.endswith("_cm2") or h.endswith("_cm3") or h.endswith("_cm3_C") or h.endswith("_cm2_Vs") or h.endswith("_ohm_cm") or h.endswith("_ohm_sq") or h.endswith("_S_sq") or h.endswith("_S_cm") or h.endswith("_percent") or h in ("rho_F_value", "measurement_no", "thickness_value") ) } header_to_col = { ws.cell(row=1, column=c).value: c for c in range(1, ws.max_column + 1) } for h in numeric_headers: col = header_to_col.get(h) if not col: continue for row in range(2, ws.max_row + 1): cell = ws.cell(row=row, column=col) if isinstance(cell.value, (int, float)): if h in ("measurement_no",): cell.number_format = "0" elif h.endswith("_percent") or h == "rho_F_value": cell.number_format = "0.000" else: cell.number_format = "0.000000E+00"
[ドキュメント] def append_to_excel( excel_path: Path, primary_records: List[Dict[str, Any]], file_records: List[Dict[str, Any]], *, primary_sheet: str = "primary", files_sheet: str = "files", write_files_sheet: bool = True, ) -> None: """Excelファイルに測定結果とファイル履歴を追記する。 詳細説明: 指定されたExcelファイル `excel_path` に、 `primary_records` を `primary_sheet` に、 `file_records` を `files_sheet` に追記します。 Excelファイルが存在しない場合は新規作成され、 存在する場合は既存のデータに追記されます。 各シートは `ensure_sheet_with_headers` でヘッダーが適切に管理され、 `style_sheet` で書式が設定されます。 :param excel_path: 出力先となるExcelファイルのパス。 :param primary_records: primaryシートに追記する測定レコードのリスト。 :param file_records: filesシートに追記するファイル履歴レコードのリスト。 :param primary_sheet: primaryデータを書き込むシート名。デフォルトは "primary"。 :param files_sheet: ファイル履歴を書き込むシート名。デフォルトは "files"。 :param write_files_sheet: filesシートを作成・追記するかどうかのフラグ。 :returns: None """ _openpyxl, Workbook, load_workbook, *_ = import_openpyxl() if excel_path.exists(): wb = load_workbook(excel_path) else: wb = Workbook() # デフォルトSheetを後で不要なら削除する if wb.active and wb.active.title == "Sheet": wb.remove(wb.active) ws, headers = ensure_sheet_with_headers(wb, primary_sheet, PRIMARY_COLUMNS) append_records(ws, headers, primary_records) style_sheet(ws) if write_files_sheet: ws_files, file_headers = ensure_sheet_with_headers(wb, files_sheet, FILES_COLUMNS) append_records(ws_files, file_headers, file_records) style_sheet(ws_files) excel_path.parent.mkdir(parents=True, exist_ok=True) wb.save(excel_path)
[ドキュメント] def make_file_record(path: Path, global_info: Dict[str, Any], num_measurements: int) -> Dict[str, Any]: """1つのCSVファイルに関する履歴レコードを作成する。 詳細説明: 指定されたファイルパス `path` と、そのファイルから抽出された グローバル情報 `global_info` を基に、ファイル履歴シート (`files_sheet`) に書き込むためのレコードを作成します。 ファイルサイズ、最終更新時刻、解析時に検出された警告なども含まれます。 :param path: 処理対象のCSVファイルのパス。 :param global_info: CSVファイルから抽出されたグローバル情報を含む辞書。 :param num_measurements: そのファイルから抽出された測定レコードの数。 :returns: ファイル履歴レコードを含む辞書。 """ stat = path.stat() rec: Dict[str, Any] = { "source_file": str(path), "source_basename": path.name, "file_size": stat.st_size, "modified_time": _dt.datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S"), "num_measurements": num_measurements, } for key in ( "imported_at", "sample_name", "measurement_date", "measurement_time", "measurement_datetime", "operator", "comment", "thickness_value", "thickness_unit", "thickness_cm", "parser_warning", ): rec[key] = global_info.get(key) return rec
[ドキュメント] def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace: """コマンドライン引数を解析する。 詳細説明: プログラムの実行時に渡されるコマンドライン引数を解析し、 設定値を含む `argparse.Namespace` オブジェクトを返します。 出力Excelファイル、入力CSVファイル、エンコーディング、再帰検索、 ドライランモード、シート名、verboseモードなどのオプションを定義しています。 :param argv: コマンドライン引数のリスト (テスト目的などでNone以外の値を渡す)。デフォルトはsys.argv[1:]。 :returns: 解析された引数を含む `argparse.Namespace` オブジェクト。 """ parser = argparse.ArgumentParser( description="Hall測定CSVから多層モデル解析用primaryデータをExcelに追記する。" ) parser.add_argument( "excel_file", help="出力先Excelファイル。存在しなければ新規作成、存在すれば追記。", ) parser.add_argument( "data_files", nargs="+", help="入力CSVファイルまたはワイルドカード。例: *.CSV \"subdir\\*.csv\"", ) parser.add_argument( "--encoding", default="cp932", help="入力CSVの文字コード。デフォルト: cp932", ) parser.add_argument( "--recursive", action="store_true", help="globで ** を使った再帰検索を有効にする。", ) parser.add_argument( "--dry-run", action="store_true", help="Excelに書き込まず、読み取り結果の概要だけ表示する。", ) parser.add_argument( "--primary-sheet", default="primary", help="primaryデータを書き込むシート名。デフォルト: primary", ) parser.add_argument( "--files-sheet", default="files", help="ファイル単位の履歴を書き込むシート名。デフォルト: files", ) parser.add_argument( "--no-files-sheet", action="store_true", help="filesシートを作成・追記しない。", ) parser.add_argument( "--verbose", action="store_true", help="ファイルごとの読み取り件数を表示する。", ) return parser.parse_args(argv)
[ドキュメント] def main(argv: Optional[Sequence[str]] = None) -> int: """プログラムのメインエントリポイント。 詳細説明: コマンドライン引数を解析し、指定されたCSVファイルを読み込み、 測定データを抽出してExcelファイルに書き込む一連の処理を実行します。 入力ファイルが見つからない場合や、解析できるデータがない場合は エラーメッセージを出力し、適切な終了コードを返します。 `--dry-run` オプションが指定された場合は、Excelへの書き込みは行わず、 解析結果の概要を表示します。 :param argv: コマンドライン引数のリスト (テスト目的などでNone以外の値を渡す)。デフォルトはsys.argv[1:]。 :returns: 成功時は 0、エラー時は 1 を返す。 """ args = parse_args(argv) excel_path = Path(args.excel_file) input_files = expand_input_patterns(args.data_files, recursive=args.recursive) if not input_files: print("ERROR: 入力ファイルが見つかりません。", file=sys.stderr) for pat in args.data_files: print(f" pattern: {pat}", file=sys.stderr) return 1 all_records: List[Dict[str, Any]] = [] file_records: List[Dict[str, Any]] = [] for path in input_files: try: records, global_info = build_records_from_csv(path, encoding=args.encoding) except Exception as exc: print(f"ERROR: failed to parse {path}: {exc}", file=sys.stderr) if args.verbose: import traceback traceback.print_exc() continue all_records.extend(records) file_records.append(make_file_record(path, global_info, len(records))) if args.verbose: print(f"{path}: {len(records)} measurement rows") if args.dry_run: print_dry_run_summary(all_records) print(f"\nTotal files : {len(input_files)}") print(f"Total records : {len(all_records)}") return 0 if not all_records: print("ERROR: 追記できる測定データがありません。", file=sys.stderr) return 1 append_to_excel( excel_path, all_records, file_records, primary_sheet=args.primary_sheet, files_sheet=args.files_sheet, write_files_sheet=not args.no_files_sheet, ) print(f"Excel written : {excel_path}") print(f"Files parsed : {len(input_files)}") print(f"Rows appended : {len(all_records)}") return 0
if __name__ == "__main__": raise SystemExit(main())