tklib.tkexcel_db のソースコード

"""
Excelファイルをデータベースのように操作する機能を提供するモジュールです。

tkExcelクラスを拡張し、条件に基づいてExcelシートの行を選択したり、
Pandas DataFrameにデータを変換したりする機能を提供します。
Excelシートの特定の範囲をテーブルとして扱い、行をレコード、列をフィールドとして操作します。

関連リンク: :doc:`tkexcel_db_usage`
"""
import os
import sys
import copy
import re
import openpyxl
#import msoffcrypto
import pandas as pd


from tklib.tkutils import pint, pfloat, del_quote, split_two, split_quoted_args, split_command_line, quote_command_if_space
from tklib.tkexcel import tkExcel, tkExcel_sheet


[ドキュメント] class tkExcelDB(tkExcel): """ Excelファイルをデータベースのように扱うためのクラスです。 tkExcelクラスを継承し、SQLのような条件指定によるデータの検索や、 Pandas DataFrameへのデータ変換機能を提供します。 特に、特定のシートをテーブルとして扱い、行をレコード、列をフィールドとして操作します。 """ def __init__(self, path = None, mode = 'r', table_name = None, password = None, allow_no_password = False, tmp_file = None, OpenFile = True, CloseFile = False, try_text_file = False, data_only = True, description = '', split_from = 1, **args): """ tkExcelDBクラスの新しいインスタンスを初期化します。 親クラスtkExcelの初期化に加え、データベースライクな操作に必要な追加プロパティを設定します。 指定されたtable_nameに基づいてワークシートを特定し、モードが'w'の場合は新しいワークブックを作成します。 :param path: str, optional: Excelファイルのパス。デフォルトはNone。 :param mode: str, optional: ファイルのオープンモード ('r' for read, 'w' for write)。デフォルトは'r'。 :param table_name: str, optional: 操作対象とするワークシートの名前。正規表現も可能。デフォルトはNone。 :param password: str, optional: Excelファイルのパスワード。デフォルトはNone。 :param allow_no_password: bool, optional: パスワードなしのファイルも許可するかどうか。デフォルトはFalse。 :param tmp_file: str, optional: 一時ファイルとして開く場合のパス。デフォルトはNone。 :param OpenFile: bool, optional: インスタンス作成時にファイルを開くかどうか。デフォルトはTrue。 :param CloseFile: bool, optional: インスタンス作成後にファイルを閉じるかどうか。デフォルトはFalse。 :param try_text_file: bool, optional: Excelファイルとして開けない場合、テキストファイルとして開くことを試みるか。デフォルトはFalse。 :param data_only: bool, optional: セルの値を取得する際に、数式ではなく計算済みの値を取得するか。デフォルトはTrue。 :param description: str, optional: オブジェクトの任意の記述。デフォルトは''。 :param split_from: int, optional: ヘッダー行の開始インデックス(1始まり)。デフォルトは1。 :param args: dict: 親クラスtkExcelに渡される追加のキーワード引数。 :returns: None """ self.description = description self.fp = None self.path = None self.mode = None self.password = None self.tmp_file = None self.try_text_file = try_text_file self.wb = None self.ws = None self.table_name = table_name self.isheet = None self.sheetname = None self.dataframe = None self.split_from = split_from super().__init__(path = path, mode = mode, password = password, allow_no_password = allow_no_password, tmp_file = tmp_file, OpenFile = OpenFile, CloseFile = CloseFile, try_text_file = try_text_file, data_only = data_only, split_from = split_from, **args) if mode == 'w': #self.path is None: self.wb = workbook = openpyxl.Workbook() self.ws = self.wb.active if OpenFile and self.wb and self.table_name: if self.ws is not None: self.isheet, self.sheetname, self.ws = self.find_isheet(self.table_name, reg_exp = True) if CloseFile: self.close() # self.update(**args) # self.ws = self.wb.worksheets[0] def __del__(self): """ tkExcelDBオブジェクトが破棄される際のクリーンアップ処理です。 現在は特に処理を行いませんが、将来的にリソース解放などの処理を追加する可能性があります。 :returns: None """ # self.close() pass def __str__(self): """ オブジェクトの文字列表現を返します。 クラスの完全パス(モジュールとクラス名)を返します。 :returns: str: クラスの完全なパス。 """ return self.ClassPath()
[ドキュメント] def print_inf(self): """ 現在のtkExcelDBオブジェクトに関する情報を標準出力に表示します。 オブジェクトのdescription、path、mode、ワークシートの数と名前、 および現在のワークシートの情報を出力します。 :returns: None """ print("") print(f"description: {self.description}") print(f" path: {self.path}") print(f" mode: {self.mode}") print(f" # of worksheet: {len(self.wb.worksheets)}") for i in range(len(self.wb.sheetnames)): print(f" {i}: {self.wb.sheetnames[i]}") isheet, sheetname, ws = self.find_isheet(self.wb.worksheets) if isheet is None: print(f" Error: current worksheet cannot be identified") else: print(f" current worksheet #{isheet}: {self.wb.sheetnames[isheet]}")
[ドキュメント] def use_dataframe(self, labels, data_list): """ 与えられたデータからPandas DataFrameを作成し、クラスの属性に設定します。 `labels`を列名とし、`data_list`を行データとしてDataFrameを構築します。 作成されたDataFrameは`self.dataframe`に格納され、同時に返されます。 :param labels: list[str]: DataFrameの列名となる文字列のリスト。 :param data_list: list[list]: DataFrameの行データとなるリストのリスト。 :returns: pandas.DataFrame: 作成されたPandas DataFrame。 """ df = pd.DataFrame(data_list, columns = labels) self.dataframe = df return df
[ドキュメント] def add_to_list(self, irow, itargets, labels, data_list, data_dict): """ 指定された行の特定の列の値をリストと辞書に追加します。 `itargets`で指定された列インデックスに対応するセルの値を`data_list`に追加し、 `labels`に対応するキーで`data_dict`に値を追加します。 :param irow: int: 値を取得する行のインデックス(1始まり)。 :param itargets: list[int]: 値を取得する列のインデックスのリスト(1始まり)。 :param labels: list[str]: `data_dict`のキーとして使用されるラベルのリスト。`itargets`と対応する。 :param data_list: list: 取得した値を追加するリスト。 :param data_dict: dict: 取得した値をラベルをキーとして追加する辞書。 :returns: None """ for i in range(len(itargets)): icol = itargets[i] label = labels[i] val = self.get(irow = irow, icol = icol, def_val = '') data_list.append(val) data_dict[label] = val
[ドキュメント] def evaluate(self, var1_pos, operator, str1, str2, args, irow = '', def_val = '', is_print = False): """ 2つの値と演算子に基づいて条件を評価します。 数値比較 ('==', '!=', '<', '<=', '>', '>='), 文字列比較 ('eq', 'neq'), 正規表現マッチ ('like', 'not like') をサポートします。 数値比較の場合は引数を浮動小数点数に変換しようとします。 :param var1_pos: str: 最初の変数`str1`の元々の位置情報。現在未使用だが将来的な拡張のために残されている。 :param operator: str: 比較演算子(例: '==', '!=', '<', '>', '<=', '>=', 'eq', 'neq', 'like', 'not like')。 :param str1: str: 比較対象となる1つ目の値。 :param str2: str: 比較対象となる2つ目の値。 :param args: list: 将来の拡張のための追加引数(現在未使用)。 :param irow: str, optional: エラーメッセージに含める行インデックス。デフォルトは''。 :param def_val: str, optional: 値が取得できない場合のデフォルト値。現在未使用。 :param is_print: bool, optional: エラーメッセージを標準出力に表示するかどうか。デフォルトはFalse。 :returns: bool or None: 評価結果が真であればTrue、偽であればFalse。数値変換エラーが発生した場合はNone。 """ fval1 = pfloat(str1, 'nan') fval2 = pfloat(str2, 'nan') str1 = f"{str1}" is_ok = False if operator == '==' or operator == '!=' or operator == '<' or operator == '<=' or operator == '>' or operator == '>=': if fval1 == 'nan': if is_print: print(f"Error in tkexcel_db.evaluate(): irow={irow}: Invalid argment #1 [{str1}] for operator [{operator}]") return None if fval2 == 'nan': if is_print: print(f"Error in tkexcel_db.evaluate(): irow={irow}: Invalid argment #2 [{str2}] for operator [{operator}]") return None if operator == '==': if fval1 == fval2: is_ok = True if operator == '!=': if fval1 != fval2: is_ok = True elif operator == '<': if fval1 < fval2: is_ok = True elif operator == '<=': if fval1 <= fval2: is_ok = True elif operator == '>': if fval1 > fval2: is_ok = True elif operator == '>=': if fval1 >= fval2: is_ok = True elif operator == 'eq': if str1 == str2: is_ok = True elif operator == 'neq': if str1 != str2: is_ok = True elif operator == 'like': if re.search(str2, str1): is_ok = True elif operator == 'not like': if not re.search(str2, str1): is_ok = True else: print(f"Error in tkexcel_db.evaluate(): irow={irow}: Invalid operator [{operator}]") exit() return is_ok
[ドキュメント] def select_irows(self, condition, key_column_org = 1, key_row_org = 1, target_row_org = 2, is_print = False, first_hit_only = False): """ 指定された条件に一致する行のインデックスのリストを返します。 `condition`文字列を解析し、Excelシートの各行をループして条件を評価します。 AND/OR演算子を含む複雑な条件を処理できます。 `first_hit_only`がTrueの場合、最初に見つかった行のインデックスのみを返します。 :param condition: str: 検索条件を表す文字列(例: 'Label1 == "value1" and Label2 < 10')。 :param key_column_org: int, optional: ヘッダー行の開始列インデックス(1始まり)。デフォルトは1。 :param key_row_org: int, optional: ヘッダー行の開始行インデックス(1始まり)。デフォルトは1。 :param target_row_org: int, optional: データ行の開始行インデックス(1始まり)。検索はこの行から開始されます。デフォルトは2。 :param is_print: bool, optional: 検索情報を標準出力に表示するかどうか。デフォルトはFalse。 :param first_hit_only: bool, optional: 最初に見つかった一致行のインデックスのみを返すか、全ての一致行のインデックスを返すか。デフォルトはFalse。 :returns: list[int] or int or None: 条件に一致する行のインデックスのリスト。`first_hit_only`がTrueで一致行が見つかった場合はその行インデックス(int)。一致行が見つからなかった場合はNone。 """ if is_print: print("") print(f"Search [{condition}] to find irow") # 検索スタート hit = [] _args = split_quoted_args(condition, quotation = '"') for irow in range(target_row_org, self.max_row() + 1): args = _args.copy() while len(args) > 0: var1_pos = 'h' if args[0] == 'and' or args[0] == 'or': operator_h = args[0] args = args[1:] else: operator_h = 'first' var_name1 = del_quote(args[0]) args = args[1:] if args[0] == 'not' and args[1] == 'like': operator = "not like" str2 = del_quote(args[2]) args = args[3:] else: operator = args[0] str2 = del_quote(args[1]) args = args[2:] # print(f"op: [{var_name1}] [{operator}] [{str2}] (args={args})") ivar1 = self.get_icolumn_from_label_regex(var_name1, sheet = self.ws, column_org = key_column_org, row_org = key_row_org) str1 = self.get(irow = irow, icol = ivar1, def_val = '') # print(f"h={operator_h}: {var_name1}={str1} {operator} {str2} (args={args})") ret = self.evaluate(var1_pos, operator, str1, str2, args, irow = irow, def_val = '', is_print = False) # print("ret=", ret) if ret is None: break if operator_h == 'first': is_ok = ret elif operator_h == 'and': # print(" and:", is_ok, ret, end = '') is_ok = is_ok and ret # print(" => ", is_ok) elif operator_h == 'or': # print(" and:", is_ok, ret, end = '') is_ok = is_ok or ret # print(" => ", is_ok) # print(" is_ok=", is_ok) if is_ok is None or not is_ok: continue if first_hit_only: return irow hit.append(irow) if first_hit_only: return None return hit
[ドキュメント] def select(self, condition, target_labels = None, key_column_org = 1, target_row_org = 2, key_row_org = 1, ret_type = 'list', is_print = False): """ 指定された条件に一致するデータをExcelシートから選択し、指定された形式で返します。 まず`select_irows`を呼び出して一致する行インデックスを取得します。 その後、`target_labels`で指定された列のデータを取得し、`ret_type`に応じて リストのリスト、辞書のリスト、または関連情報を含む辞書として返します。 :param condition: str: 検索条件を表す文字列(例: 'Label1 == "value1" and Label2 < 10')。 :param target_labels: list[str], optional: 取得する列のラベルのリスト。Noneの場合、すべての列のラベルが使用されます。デフォルトはNone。 :param key_column_org: int, optional: ヘッダー行の開始列インデックス(1始まり)。デフォルトは1。 :param target_row_org: int, optional: データ行の開始行インデックス(1始まり)。検索はこの行から開始されます。デフォルトは2。 :param key_row_org: int, optional: ヘッダー行の開始行インデックス(1始まり)。デフォルトは1。 :param ret_type: str, optional: 戻り値の形式を指定します。 'list' (リストのリスト), 'dict' (辞書のリスト), 'all' (ラベル、リスト、辞書、行インデックスを含む辞書), 'irows' (ラベルと行インデックス)。 デフォルトは'list'。 :param is_print: bool, optional: 検索情報を標準出力に表示するかどうか。デフォルトはFalse。 :returns: tuple[list[str], list[list]] or tuple[list[str], list[dict]] or dict or tuple[list[str], list[int]]: `ret_type`が'list'または'dict'の場合、(ラベルのリスト, データ)。 `ret_type`が'all'の場合、{"labels": labels, "list": res, "dict": res_dict, "irows": irow_list}。 `ret_type`が'irows'の場合、(ラベルのリスト, 行インデックスのリスト)。 無効な`ret_type`が指定された場合はプログラムを終了します。 """ if is_print: print("") print(f"Search [{condition}] for ", target_labels) if target_labels is None: target_labels = self.get_labels(irow_origin = key_row_org, icol_origin = key_column_org) # 戻り値に入れるフィールド番号・フィールド名のリスト itargets = [] labels = [] for i in range(len(target_labels)): idx = self.get_icolumn_from_label_regex(target_labels[i], sheet = self.ws, column_org = key_column_org, row_org = key_row_org) itargets.append(idx) # print("i=", i, target_labels[i], idx, key_row_org) label = self.get(irow = key_row_org, icol = idx, def_val = '') labels.append(label) # 検索スタート _args = split_quoted_args(condition, quotation = '"') irow_list = self.select_irows(condition = condition, key_column_org = key_column_org, key_row_org = key_row_org, target_row_org = target_row_org, is_print = is_print) res = [] res_dict = [] for irow in irow_list: data_list = [] data_dict = {} self.add_to_list(irow, itargets, labels, data_list, data_dict) # print("data_list=", data_list) # 戻り値のリストに追加 res_dict.append(data_dict) if ret_type == 'list' or ret_type == 'all': res.append(data_list) elif ret_type == 'dict': res.append(data_dict) else: print("") print(f"Error in tkexcel_db.select(): Invalid ret_type=[{ret_type}]") exit() if ret_type == 'all': return {"labels": labels, "list": res, "dict": res_dict, "irows": irow_list} if ret_type == 'irows': return labels, irows return labels, res