import sys import os import signal import builtins import numpy as np from numpy import exp, log, sin, cos, tan, arcsin, arccos, arctan, sinh, cosh, tanh, sqrt, abs import pandas as pd from scipy.optimize import minimize import matplotlib.pyplot as plt import matplotlib.widgets as wg #======================================================================= # ctrl-cの割り込みテスト # scipy.optimizeで割り込んでいるようで、使えない #======================================================================= class tkStopHandler(): def __init__(self): self.stop_flag = False def __call__(self, signal, frame): while 1: print("") print("Type 'yes' if terminates the program>> ", end = '') keyin = input() if keyin == 'yes': print(" Stopping") self.stop_flag = True return True else: print(" Cancelled") break return True #stop_hander = tkStopHandler() #signal.signal(signal.SIGINT, stop_hander) #while(True): # if stop_hander.stop_flag: # exit(0) #======================================================================= # ctrl-cの割り込みテスト: ここまで #======================================================================= from tklib.tkutils import replace_path, getarg, getintarg, getfloatarg, pint, pfloat from tklib.tkapplication import tkApplication from tklib.tkparams import tkParams from tklib.tksci.tksci import asin, acos, atan, degcos, degsin, degtan, degacos, degasin, degatan, arcsin, arccos, arctan from tklib.tksci.tksci import factorial, log10, gamma, combination, eVTonm, nmToeV, Bn from tklib.tksci.tksci import h, h_bar, hbar, e, kB, NA, c, pi, pi2, torad, todeg, basee from tklib.tksci.tksci import me, mp, mn from tklib.tksci.tksci import u0, e0, a0, R, F, g, G from tklib.tksci.tksci import HartreeToeV, RyToeV, KToeV, eVToK, JToeV, eVToJ, Debye from tklib.tksci.tkFit import tkFit #======================================= # プログラム開始 #======================================= def initialize(app): cparams = tkParams() app.cparams = cparams #nelder-mead Downhill simplex #powell Modified Powell #cg conjugate gradient (Polak-Ribiere method) #bfgs BFGS法 #newton-cg Newton-CG #trust-ncg 信頼領域 Newton-CG 法 #dogleg 信頼領域 dog-leg 法 #L-BFGS-B’ (see here) #TNC’ (see here) #COBYLA’ (see here) #SLSQP’ (see here) #trust-constr’(see here) #dogleg’ (see here) #trust-exact’ (see here) #trust-krylov’ (see here) cparams.method = "bfgs" #========================================== # Source parameters to be fitted #========================================== cparams.infile = 'peak.xlsx' cparams.nx = None cparams.mode = 'fit' cparams.func = "p[0]*exp(-((x[0]-p[1])/p[2])**2)" cparams.p0s = "1.3,0.6,0.1" cparams.fit_range = "-1e100:1e100, -1e100:1e100, -1e100:1e100" cparams.olabel = '' cparams.xlabel = '' cparams.ylabel = '' cparams.zlabel = '' # 数値微分する際の変数の微小変位 cparams.h = 0.01 cparams.maxiter = 100 cparams.tol = 1.0e-5 #========================================== # Graph parameters #========================================== cparams.fplot = 1 cparams.ngdata = 51 cparams.xgmin = -4.0 cparams.xgmax = 4.0 cparams.tsleep = 0.01 def update_vars(app): cparams = app.cparams argv = sys.argv # n = len(argv) #for i in range(n): # print("i=", i, argv[i]) cparams.mode = getarg( 1, cparams.mode) cparams.infile = getarg( 2, cparams.infile) cparams.method = getarg( 3, cparams.method) cparams.func = getarg( 4, cparams.func) cparams.p0s = getarg( 5, cparams.p0s) cparams.fit_range = getarg( 6, cparams.fit_range) cparams.maxiter = getintarg (7, cparams.maxiter) cparams.tol = getfloatarg(8, cparams.tol) cparams.j = getfloatarg(9, cparams.h) cparams.olabel = getarg(10, cparams.olabel) cparams.xlabel = getarg(11, cparams.xlabel) cparams.ylabel = getarg(12, cparams.ylabel) cparams.zlabel = getarg(13, cparams.zlabel) # app.add_argument(opt = "-s", type = "str", var_name = 'script_list_name', opt_str = "-s=script_list_name", desc = 'Script list file', # defval = "default", optional = True); # app.add_argument(opt = "-i", type = "float", var_name = 'idx', opt_str = "-i=integer", desc = 'Index to speify alpha', # defval = 5, optional = False); # app.add_argument(opt = None, type = "str", var_name = 'infile', opt_str = "infile", desc = 'Input file', # defval = "input.txt", optional = False); # app.add_argument(opt = None, type = "str", var_name = 'nmax', opt_str = "nmax", desc = 'Max number of interation', # defval = 5, optional = False); # args_opt, args_idx, args_vars = app.read_args(vars = cparams.__dict__, check_allowed_args = True) # if args_opt is None: # error_no = args_idx # error_message = args_vars # app.terminate("\n\n" # + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" # + f"! {error_message}\n" # + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", # usage = app.usage) # app.set_usage(usage_str) #========================================== # functions #========================================== def cal_y(xd, pk, app): # print("caly: xd=", xd) # print("caly: pk=", pk) # print("caly: func=", func) y = eval(app.cparams.func, globals(), {"x": xd, "p": pk}) return y def ycal_list(xd, pk, app): y = [] for i in range(len(xd[0])): _xk = [xd[j][i] for j in range(len(xd))] y.append(cal_y(_xk, pk, app)) return y # 1次微分を定義するとcgやbfgsなどの勾配法を使える def diff1(pk, xd, yd, app, callback): nvar = len(pk) diff = pk.copy() h = app.cparams.h for i in range(nvar): _pk = pk.copy() _pk[i] = pk[i] - h ym = minimize_func(_pk, xd, yd, app, callback) _pk[i] = pk[i] + h yp = minimize_func(_pk, xd, yd, app, callback) diff[i] = (yp - ym) / 2.0 / h return diff def minimize_func(pk, xd, yd, app, callback): if callback.stop_flag: return 1.0e100 cparams = app.cparams # print("pk=", pk) nd = len(xd[0]) nvar = len(xd) MSE = 0.0 for i in range(nd): do_skip = False for j in range(nvar): xmin, xmax = cparams.fit_range[j] if xd[j][i] < xmin or xmax < xd[j][i]: do_skip = True break if do_skip: continue _xk = [xd[j][i] for j in range(nvar)] yc = cal_y(_xk, pk, app) d = yd[i] - yc MSE += d * d MSE /= nd callback.stop_pk = pk callback.stop_MSE = MSE return MSE # callbackを使うと、最適化過程をモニターできる # 引数は変数のリストだけが渡される # 反復回数などはglobal変数で保持する class tkCallback(): def __init__(self, app): self.app = app self.stop_flag = False self.iter = 0 self.xiter = [] self.yfmin = [] self.ax_fit = None self.fitdata = None self.ax_conv = None self.convdata = None def __call__(self, pk, xd, yd): if self.stop_flag: return False w = plt.get_current_fig_manager().window if w != self.window: return False MSE = minimize_func(pk, xd, yd, self.app, self) print(f"callback {self.iter}: pk={pk} MSE={MSE}") self.iter += 1 self.xiter.append(self.iter) self.yfmin.append(MSE) self.convdata[0].set_data(self.xiter, self.yfmin) self.ax_conv.set_xlim((0.0, max(self.xiter) + 1.0)) self.ax_conv.set_ylim((min(self.yfmin) * 0.8, max(self.yfmin) * 1.2)) index = range(len(xd[0])) yc = ycal_list(xd, pk, self.app) if len(xd) > 1: self.fitdata[0].set_data(index, yc) else: self.fitdata[0].set_data(xd[0], yc) if self.app.cparams.fplot: plt.tight_layout() plt.subplots_adjust(top = 0.90, bottom = 0.10) plt.pause(self.app.cparams.tsleep) return True def fit(app): global fp global ax_fit, ax_conv global fitdata, convdata cparams = app.cparams callback = tkCallback(app) app.log_path = app.replace_path(cparams.infile) app.redict(["stdout", app.log_path], "w") cparams.p0s = [float(s) for s in cparams.p0s.split(',')] r = [] for s in cparams.fit_range.split(','): xmin, xmax = s.split(':') r.append([float(xmin), float(xmax)]) cparams.fit_range = r cparams.out_file = replace_path(cparams.infile, "{dirname}/{filebody}-fitting.xlsx") cparams.convergence_file = replace_path(cparams.infile, "{dirname}/{filebody}-convergence.xlsx") cparams.parameter_file = replace_path(cparams.infile, "{dirname}/{filebody}.prm") print("") print("Fitting program to given function") print(f"mode : {cparams.mode}") print(f"method : {cparams.method}") print(f"input file : {cparams.infile}") print(f"output file : {cparams.out_file}") print(f"convergence file: {cparams.convergence_file}") print(f"parameter file : {cparams.parameter_file}") print(f"func : {cparams.func}") print(f"Objective function: {cparams.olabel}") print(f"x : {cparams.xlabel}") print(f"y : {cparams.ylabel}") print(f"z : {cparams.zlabel}") print(f"initial values : {cparams.p0s}") print(f"tol : {cparams.tol}") print(f"maxiter : {cparams.maxiter}") if '***' in cparams.method: input("\nError: Choose method\n") exit() if '***' in cparams.func: input("\nError: Choose or input function\n") exit() print("") print(f"Read [{cparams.infile}]") if os.path.exists(cparams.infile): df = pd.read_excel(cparams.infile, engine = 'openpyxl') else: print("") print(f"File [{cparams.infile}] does not exist.") exit() labels = df.columns.to_list() yd = df[cparams.olabel].to_numpy() xd = [] xd_labels = [] if cparams.xlabel != '---': xd.append(df[cparams.xlabel].to_numpy()) xd_labels.append(cparams.xlabel) if cparams.ylabel != '---': xd.append(df[cparams.ylabel].to_numpy()) xd_labels.append(cparams.ylabel) if cparams.zlabel != '---': xd.append(df[cparams.zlabel].to_numpy()) xd_labels.append(cparams.zlabel) nx = len(xd[0]) print("") print("x ranges:") for i in range(len(xd)): xmin = min(xd[i]) xmax = max(xd[i]) print(f" x[{i}]: {xmin} - {xmax}") print(f" fit in: {cparams.fit_range[i][0]} - {cparams.fit_range[i][1]}") print("") print("Calculate initial function:") index = range(nx) yini = ycal_list(xd, cparams.p0s, app) if cparams.fplot == 1: figure, axes = plt.subplots(1, 2, figsize = (10, 5)) ax_fit = axes[0] ax_conv = axes[1] if len(xd) > 1: ax_fit.plot(index, yd, color = 'blue', linestyle = '', linewidth = 0.5, fillstyle = 'full', marker = 'x', markersize = 5) ax_fit.plot(index, yini, color = 'black', linestyle = 'dashed', linewidth = 0.5) fitdata = ax_fit.plot([], [], color = 'red', linestyle = '-', linewidth = 0.5) ax_fit.set_xlabel('index') else: ax_fit.plot(xd[0], yd, color = 'blue', linestyle = '', linewidth = 0.5, fillstyle = 'full', marker = 'x', markersize = 5) ax_fit.plot(xd[0], yini, color = 'black', linestyle = 'dashed', linewidth = 0.5) fitdata = ax_fit.plot([], [], color = 'red', linestyle = '-', linewidth = 0.5) ax_fit.set_xlabel(cparams.xlabel) ax_fit.set_ylabel(cparams.olabel) ax_conv.set_yscale('log') convdata = ax_conv.plot([], [], color = 'black', linestyle = '-', linewidth = 0.5, fillstyle = 'full', marker = 'o', markersize = 5) ax_conv.set_xlabel('# of iteration') ax_conv.set_ylabel('MSE') callback.ax_fit = ax_fit callback.ax_conv = ax_conv callback.fitdata = fitdata callback.convdata = convdata callback.window = plt.get_current_fig_manager().window # tight_layoutをするとsubplots_adjust()が無効化されるので、この順番で設定する plt.tight_layout() plt.subplots_adjust(top = 0.90, bottom = 0.10) def button_click(e): callback.stop_flag = True ax_button = plt.axes([0.15, 0.95, 0.10, 0.03]) button = wg.Button(ax_button, 'stop', color = '#f8e58c', hovercolor = '#38b48b') button.on_clicked(button_click) plt.pause(0.001) if cparams.mode == 'sim': print("Press ENTER to terminate:", end = '') ret = input() exit() print("") print("Minimize:") res = minimize(lambda pk: minimize_func(pk, xd, yd, app, callback), cparams.p0s, method = cparams.method, jac = lambda pk: diff1(pk, xd, yd, app, callback), callback = lambda pk: callback(pk, xd, yd), tol = cparams.tol, options = {'maxiter': cparams.maxiter, "disp": True}) # print("") # print(res) print("") if res.success: print(f"Function MSE takes the minimum") print(f" at MSE={res.fun}") print(f" with pk={res.x}") print(f" iteration: {res.nit}") else: print(f"Function did not converge") print(res) yfin = ycal_list(xd, res.x, app) print("") print(f"Save results to [{cparams.out_file}]") df = pd.DataFrame(np.array([*xd, yd, yini, yfin]).T, columns = [*xd_labels, cparams.olabel, 'initial', 'final']) df.to_excel(cparams.out_file) print("") print(f"Save convergence process to [{cparams.convergence_file}]") df = pd.DataFrame(np.array([callback.xiter, callback.yfmin]).T, columns = ['iter', 'MSE']) df.to_excel(cparams.convergence_file) print("") print(f"Save parameters to [{cparams.parameter_file}]") cparams.fit_range = "{}".format(cparams.fit_range) cresults = tkParams() cresults.initial_values = f"{cparams.p0s}" cresults.final_values = "{}".format(list(res.x)) cresults.MAE = res.fun cresults.iteration = res.nit del cparams.p0s cresults.initial_values = cresults.initial_values.replace('[', '').replace(']', '') cresults.final_values = cresults.final_values.replace('[', '').replace(']', '') cparams.save_parameters(cparams.parameter_file, section = 'Parameters', sort_by_keys = False, update_commandline = False, IsPrint = False) cresults.save_parameters(cparams.parameter_file, section = 'Fitting', sort_by_keys = False, update_commandline = False, IsPrint = False) input("Press ENTER to terminate >> ") #========================================== # Main routine #========================================== def main(): app = tkApplication() print(f"Initialize parameters") initialize(app) print(f"Update parameters by command-line arguments") update_vars(app) fit(app) if __name__ == "__main__": main()