"""
Excelファイルをデータベースのように操作する機能を提供するモジュールです。
tkExcelクラスを拡張し、条件に基づいてExcelシートの行を選択したり、
Pandas DataFrameにデータを変換したりする機能を提供します。
Excelシートの特定の範囲をテーブルとして扱い、行をレコード、列をフィールドとして操作します。
関連リンク: :doc:`tkexcel_db_usage`
"""
import os
import sys
import copy
import re
import openpyxl
#import msoffcrypto
import pandas as pd
from tklib.tkutils import pint, pfloat, del_quote, split_two, split_quoted_args, split_command_line, quote_command_if_space
from tklib.tkexcel import tkExcel, tkExcel_sheet
[ドキュメント]
class tkExcelDB(tkExcel):
"""
Excelファイルをデータベースのように扱うためのクラスです。
tkExcelクラスを継承し、SQLのような条件指定によるデータの検索や、
Pandas DataFrameへのデータ変換機能を提供します。
特に、特定のシートをテーブルとして扱い、行をレコード、列をフィールドとして操作します。
"""
def __init__(self, path = None, mode = 'r', table_name = None, password = None, allow_no_password = False, tmp_file = None,
OpenFile = True, CloseFile = False, try_text_file = False, data_only = True, description = '', split_from = 1, **args):
"""
tkExcelDBクラスの新しいインスタンスを初期化します。
親クラスtkExcelの初期化に加え、データベースライクな操作に必要な追加プロパティを設定します。
指定されたtable_nameに基づいてワークシートを特定し、モードが'w'の場合は新しいワークブックを作成します。
:param path: str, optional: Excelファイルのパス。デフォルトはNone。
:param mode: str, optional: ファイルのオープンモード ('r' for read, 'w' for write)。デフォルトは'r'。
:param table_name: str, optional: 操作対象とするワークシートの名前。正規表現も可能。デフォルトはNone。
:param password: str, optional: Excelファイルのパスワード。デフォルトはNone。
:param allow_no_password: bool, optional: パスワードなしのファイルも許可するかどうか。デフォルトはFalse。
:param tmp_file: str, optional: 一時ファイルとして開く場合のパス。デフォルトはNone。
:param OpenFile: bool, optional: インスタンス作成時にファイルを開くかどうか。デフォルトはTrue。
:param CloseFile: bool, optional: インスタンス作成後にファイルを閉じるかどうか。デフォルトはFalse。
:param try_text_file: bool, optional: Excelファイルとして開けない場合、テキストファイルとして開くことを試みるか。デフォルトはFalse。
:param data_only: bool, optional: セルの値を取得する際に、数式ではなく計算済みの値を取得するか。デフォルトはTrue。
:param description: str, optional: オブジェクトの任意の記述。デフォルトは''。
:param split_from: int, optional: ヘッダー行の開始インデックス(1始まり)。デフォルトは1。
:param args: dict: 親クラスtkExcelに渡される追加のキーワード引数。
:returns: None
"""
self.description = description
self.fp = None
self.path = None
self.mode = None
self.password = None
self.tmp_file = None
self.try_text_file = try_text_file
self.wb = None
self.ws = None
self.table_name = table_name
self.isheet = None
self.sheetname = None
self.dataframe = None
self.split_from = split_from
super().__init__(path = path, mode = mode, password = password, allow_no_password = allow_no_password, tmp_file = tmp_file,
OpenFile = OpenFile, CloseFile = CloseFile, try_text_file = try_text_file, data_only = data_only, split_from = split_from, **args)
if mode == 'w': #self.path is None:
self.wb = workbook = openpyxl.Workbook()
self.ws = self.wb.active
if OpenFile and self.wb and self.table_name:
if self.ws is not None:
self.isheet, self.sheetname, self.ws = self.find_isheet(self.table_name, reg_exp = True)
if CloseFile:
self.close()
# self.update(**args)
# self.ws = self.wb.worksheets[0]
def __del__(self):
"""
tkExcelDBオブジェクトが破棄される際のクリーンアップ処理です。
現在は特に処理を行いませんが、将来的にリソース解放などの処理を追加する可能性があります。
:returns: None
"""
# self.close()
pass
def __str__(self):
"""
オブジェクトの文字列表現を返します。
クラスの完全パス(モジュールとクラス名)を返します。
:returns: str: クラスの完全なパス。
"""
return self.ClassPath()
[ドキュメント]
def print_inf(self):
"""
現在のtkExcelDBオブジェクトに関する情報を標準出力に表示します。
オブジェクトのdescription、path、mode、ワークシートの数と名前、
および現在のワークシートの情報を出力します。
:returns: None
"""
print("")
print(f"description: {self.description}")
print(f" path: {self.path}")
print(f" mode: {self.mode}")
print(f" # of worksheet: {len(self.wb.worksheets)}")
for i in range(len(self.wb.sheetnames)):
print(f" {i}: {self.wb.sheetnames[i]}")
isheet, sheetname, ws = self.find_isheet(self.wb.worksheets)
if isheet is None:
print(f" Error: current worksheet cannot be identified")
else:
print(f" current worksheet #{isheet}: {self.wb.sheetnames[isheet]}")
[ドキュメント]
def use_dataframe(self, labels, data_list):
"""
与えられたデータからPandas DataFrameを作成し、クラスの属性に設定します。
`labels`を列名とし、`data_list`を行データとしてDataFrameを構築します。
作成されたDataFrameは`self.dataframe`に格納され、同時に返されます。
:param labels: list[str]: DataFrameの列名となる文字列のリスト。
:param data_list: list[list]: DataFrameの行データとなるリストのリスト。
:returns: pandas.DataFrame: 作成されたPandas DataFrame。
"""
df = pd.DataFrame(data_list, columns = labels)
self.dataframe = df
return df
[ドキュメント]
def add_to_list(self, irow, itargets, labels, data_list, data_dict):
"""
指定された行の特定の列の値をリストと辞書に追加します。
`itargets`で指定された列インデックスに対応するセルの値を`data_list`に追加し、
`labels`に対応するキーで`data_dict`に値を追加します。
:param irow: int: 値を取得する行のインデックス(1始まり)。
:param itargets: list[int]: 値を取得する列のインデックスのリスト(1始まり)。
:param labels: list[str]: `data_dict`のキーとして使用されるラベルのリスト。`itargets`と対応する。
:param data_list: list: 取得した値を追加するリスト。
:param data_dict: dict: 取得した値をラベルをキーとして追加する辞書。
:returns: None
"""
for i in range(len(itargets)):
icol = itargets[i]
label = labels[i]
val = self.get(irow = irow, icol = icol, def_val = '')
data_list.append(val)
data_dict[label] = val
[ドキュメント]
def evaluate(self, var1_pos, operator, str1, str2, args, irow = '', def_val = '', is_print = False):
"""
2つの値と演算子に基づいて条件を評価します。
数値比較 ('==', '!=', '<', '<=', '>', '>='), 文字列比較 ('eq', 'neq'),
正規表現マッチ ('like', 'not like') をサポートします。
数値比較の場合は引数を浮動小数点数に変換しようとします。
:param var1_pos: str: 最初の変数`str1`の元々の位置情報。現在未使用だが将来的な拡張のために残されている。
:param operator: str: 比較演算子(例: '==', '!=', '<', '>', '<=', '>=', 'eq', 'neq', 'like', 'not like')。
:param str1: str: 比較対象となる1つ目の値。
:param str2: str: 比較対象となる2つ目の値。
:param args: list: 将来の拡張のための追加引数(現在未使用)。
:param irow: str, optional: エラーメッセージに含める行インデックス。デフォルトは''。
:param def_val: str, optional: 値が取得できない場合のデフォルト値。現在未使用。
:param is_print: bool, optional: エラーメッセージを標準出力に表示するかどうか。デフォルトはFalse。
:returns: bool or None: 評価結果が真であればTrue、偽であればFalse。数値変換エラーが発生した場合はNone。
"""
fval1 = pfloat(str1, 'nan')
fval2 = pfloat(str2, 'nan')
str1 = f"{str1}"
is_ok = False
if operator == '==' or operator == '!=' or operator == '<' or operator == '<=' or operator == '>' or operator == '>=':
if fval1 == 'nan':
if is_print:
print(f"Error in tkexcel_db.evaluate(): irow={irow}: Invalid argment #1 [{str1}] for operator [{operator}]")
return None
if fval2 == 'nan':
if is_print:
print(f"Error in tkexcel_db.evaluate(): irow={irow}: Invalid argment #2 [{str2}] for operator [{operator}]")
return None
if operator == '==':
if fval1 == fval2:
is_ok = True
if operator == '!=':
if fval1 != fval2:
is_ok = True
elif operator == '<':
if fval1 < fval2:
is_ok = True
elif operator == '<=':
if fval1 <= fval2:
is_ok = True
elif operator == '>':
if fval1 > fval2:
is_ok = True
elif operator == '>=':
if fval1 >= fval2:
is_ok = True
elif operator == 'eq':
if str1 == str2:
is_ok = True
elif operator == 'neq':
if str1 != str2:
is_ok = True
elif operator == 'like':
if re.search(str2, str1):
is_ok = True
elif operator == 'not like':
if not re.search(str2, str1):
is_ok = True
else:
print(f"Error in tkexcel_db.evaluate(): irow={irow}: Invalid operator [{operator}]")
exit()
return is_ok
[ドキュメント]
def select_irows(self, condition, key_column_org = 1, key_row_org = 1, target_row_org = 2, is_print = False, first_hit_only = False):
"""
指定された条件に一致する行のインデックスのリストを返します。
`condition`文字列を解析し、Excelシートの各行をループして条件を評価します。
AND/OR演算子を含む複雑な条件を処理できます。
`first_hit_only`がTrueの場合、最初に見つかった行のインデックスのみを返します。
:param condition: str: 検索条件を表す文字列(例: 'Label1 == "value1" and Label2 < 10')。
:param key_column_org: int, optional: ヘッダー行の開始列インデックス(1始まり)。デフォルトは1。
:param key_row_org: int, optional: ヘッダー行の開始行インデックス(1始まり)。デフォルトは1。
:param target_row_org: int, optional: データ行の開始行インデックス(1始まり)。検索はこの行から開始されます。デフォルトは2。
:param is_print: bool, optional: 検索情報を標準出力に表示するかどうか。デフォルトはFalse。
:param first_hit_only: bool, optional: 最初に見つかった一致行のインデックスのみを返すか、全ての一致行のインデックスを返すか。デフォルトはFalse。
:returns: list[int] or int or None: 条件に一致する行のインデックスのリスト。`first_hit_only`がTrueで一致行が見つかった場合はその行インデックス(int)。一致行が見つからなかった場合はNone。
"""
if is_print:
print("")
print(f"Search [{condition}] to find irow")
# 検索スタート
hit = []
_args = split_quoted_args(condition, quotation = '"')
for irow in range(target_row_org, self.max_row() + 1):
args = _args.copy()
while len(args) > 0:
var1_pos = 'h'
if args[0] == 'and' or args[0] == 'or':
operator_h = args[0]
args = args[1:]
else:
operator_h = 'first'
var_name1 = del_quote(args[0])
args = args[1:]
if args[0] == 'not' and args[1] == 'like':
operator = "not like"
str2 = del_quote(args[2])
args = args[3:]
else:
operator = args[0]
str2 = del_quote(args[1])
args = args[2:]
# print(f"op: [{var_name1}] [{operator}] [{str2}] (args={args})")
ivar1 = self.get_icolumn_from_label_regex(var_name1, sheet = self.ws,
column_org = key_column_org, row_org = key_row_org)
str1 = self.get(irow = irow, icol = ivar1, def_val = '')
# print(f"h={operator_h}: {var_name1}={str1} {operator} {str2} (args={args})")
ret = self.evaluate(var1_pos, operator, str1, str2, args, irow = irow, def_val = '', is_print = False)
# print("ret=", ret)
if ret is None:
break
if operator_h == 'first':
is_ok = ret
elif operator_h == 'and':
# print(" and:", is_ok, ret, end = '')
is_ok = is_ok and ret
# print(" => ", is_ok)
elif operator_h == 'or':
# print(" and:", is_ok, ret, end = '')
is_ok = is_ok or ret
# print(" => ", is_ok)
# print(" is_ok=", is_ok)
if is_ok is None or not is_ok:
continue
if first_hit_only:
return irow
hit.append(irow)
if first_hit_only:
return None
return hit
[ドキュメント]
def select(self, condition, target_labels = None,
key_column_org = 1, target_row_org = 2, key_row_org = 1,
ret_type = 'list', is_print = False):
"""
指定された条件に一致するデータをExcelシートから選択し、指定された形式で返します。
まず`select_irows`を呼び出して一致する行インデックスを取得します。
その後、`target_labels`で指定された列のデータを取得し、`ret_type`に応じて
リストのリスト、辞書のリスト、または関連情報を含む辞書として返します。
:param condition: str: 検索条件を表す文字列(例: 'Label1 == "value1" and Label2 < 10')。
:param target_labels: list[str], optional: 取得する列のラベルのリスト。Noneの場合、すべての列のラベルが使用されます。デフォルトはNone。
:param key_column_org: int, optional: ヘッダー行の開始列インデックス(1始まり)。デフォルトは1。
:param target_row_org: int, optional: データ行の開始行インデックス(1始まり)。検索はこの行から開始されます。デフォルトは2。
:param key_row_org: int, optional: ヘッダー行の開始行インデックス(1始まり)。デフォルトは1。
:param ret_type: str, optional: 戻り値の形式を指定します。
'list' (リストのリスト),
'dict' (辞書のリスト),
'all' (ラベル、リスト、辞書、行インデックスを含む辞書),
'irows' (ラベルと行インデックス)。
デフォルトは'list'。
:param is_print: bool, optional: 検索情報を標準出力に表示するかどうか。デフォルトはFalse。
:returns: tuple[list[str], list[list]] or tuple[list[str], list[dict]] or dict or tuple[list[str], list[int]]:
`ret_type`が'list'または'dict'の場合、(ラベルのリスト, データ)。
`ret_type`が'all'の場合、{"labels": labels, "list": res, "dict": res_dict, "irows": irow_list}。
`ret_type`が'irows'の場合、(ラベルのリスト, 行インデックスのリスト)。
無効な`ret_type`が指定された場合はプログラムを終了します。
"""
if is_print:
print("")
print(f"Search [{condition}] for ", target_labels)
if target_labels is None:
target_labels = self.get_labels(irow_origin = key_row_org, icol_origin = key_column_org)
# 戻り値に入れるフィールド番号・フィールド名のリスト
itargets = []
labels = []
for i in range(len(target_labels)):
idx = self.get_icolumn_from_label_regex(target_labels[i], sheet = self.ws,
column_org = key_column_org, row_org = key_row_org)
itargets.append(idx)
# print("i=", i, target_labels[i], idx, key_row_org)
label = self.get(irow = key_row_org, icol = idx, def_val = '')
labels.append(label)
# 検索スタート
_args = split_quoted_args(condition, quotation = '"')
irow_list = self.select_irows(condition = condition, key_column_org = key_column_org, key_row_org = key_row_org,
target_row_org = target_row_org, is_print = is_print)
res = []
res_dict = []
for irow in irow_list:
data_list = []
data_dict = {}
self.add_to_list(irow, itargets, labels, data_list, data_dict)
# print("data_list=", data_list)
# 戻り値のリストに追加
res_dict.append(data_dict)
if ret_type == 'list' or ret_type == 'all':
res.append(data_list)
elif ret_type == 'dict':
res.append(data_dict)
else:
print("")
print(f"Error in tkexcel_db.select(): Invalid ret_type=[{ret_type}]")
exit()
if ret_type == 'all':
return {"labels": labels, "list": res, "dict": res_dict, "irows": irow_list}
if ret_type == 'irows':
return labels, irows
return labels, res