import sys
import os.path
import csv
import re
from math import exp, log
import numpy as np
import scipy.signal
import matplotlib.pyplot as plt
"""
逆畳み込み
1. Jacobi法による逆畳み込み
2. Gauss-Seidel法による逆畳み込み
参考文献:「科学計測のための波形データ処理」、南茂夫 編著 (CQ出版社, 1986)
動作確認用
3. numpy.convolveによる畳み込み
4. scipy.signal.deconvolveによる逆畳み込み
5. fftによる逆畳み込み
注意点:
yconv = np.convolve(yraw, ywf, **kwargs) / sum(ywf)
畳み込み関数を規格化するため、フィルター関数ywfの和で割る必要がある。
戻り値のリストは、フィルター関数の点数の1/2だけ、前後に拡張される。
mode = "same"を指定すると、戻り値のリストの範囲は入力関数の範囲に一致させられる。
逆畳み込みで元のデータに戻すには、拡張部分のデータも必要。
IDec, remainder = scipy.signal.deconvolve(Iconv, ywf) * sum(ywf)
逆畳み込みはも入力データの質に非常に敏感。以下の条件を満足しないと、ちゃんとした結果が得られない。
・フィルター関数の幅は入力データよりも短い
・フィルター関数の値はある程度の値よりも大きい(0に近い値があるとだめ?)
・入力データの前には、フィルター関数の幅よりも大きいゼロ領域が必要
・入力データにノイズがある場合は、適切に平滑化処理が必要
"""
#===================
# Global parametrs
#===================
# File
infile = "pes.csv"
# Analysis range
xmin = -4.5 # eV
xmax = 2.0 # eV
# mode = [gs|jacobi|convolve|fft]
mode = 'gs'
# Filter parameters for Gauss function
Wa = 0.121223 # eV
Grange = 2.0
# for update graph
sleeptime = 0.001
# for Jacobi/Gauss-Seidel method
dump = 1.0
nmaxiter = 300
eps = 1.0e-4
nsmooth = 5
zero_correction = 0
# only for 'convolve': convmode = [same|full|]
convmode = 'same'
# smoothmode = [convolve|extend|average]
smoothmode = 'convolve+extend'
# only for 'convolve' and 'fft': 'exnted data' parameters
kzero = 5
klin = 5
#===================
# 起動時引数
#===================
[ドキュメント]
def usage():
print("usage: python {} file mode convmode smoothmode xmin xmax Wa Grange kzero klin".format(scriptname))
print(" mode = [convolve|fft]")
print(" convmode = [''|full|same]")
print(" smoothmode = combination of [none|convolve|extend|average]")
print(" ex.: python {} pes.csv convolve same convolve+extend -4.5 2.0 0.12 2.0 1 5".format(scriptname))
print(" ex.: python {} pes.csv fft full convolve+extend -4.5 2.0 0.12 2.0 5 5".format(scriptname))
print("")
print("usage: python {} file mode xmin xmax Wa dump nmaxiter eps nsmooth zeroc".format(scriptname))
print(" mode = [gs|jacobi] gs=Gauss-Seidel method jacobi=Jacobi method")
print(" zeroc = [0|1] zero correction after a Jacobi/Gauss-Seidel cycle]")
print(" ex.: python {} pes.csv gs -6.0 2.0 0.12 1.0 300 1.0e-4 5 0".format(scriptname))
print("")
[ドキュメント]
def savecsv(outfile, header, datalist):
try:
print("Write to [{}]".format(outfile))
f = open(outfile, 'w')
except:
# except IOError:
print("Error: Can not write to [{}]".format(outfile))
else:
fout = csv.writer(f, lineterminator='\n')
fout.writerow(header)
# fout.writerows(data)
for i in range(0, len(datalist[0])):
a = []
for j in range(len(datalist)):
a.append(datalist[j][i])
fout.writerow(a)
f.close()
[ドキュメント]
def read_csv(infile, delimiter = ',', xmin = None, xmax = None):
data = []
try:
infp = open(infile, "r")
f = csv.reader(infp, delimiter = delimiter)
header = next(f)
for j in range(len(header)):
data.append([])
for row in f:
try:
E = float(row[0])
I = float(row[1])
except:
continue
if xmin <= E <= xmax:
for j in range(len(row)):
data[j].append(float(row[j]))
except:
print("Error: Can not read [{}]".format(infile))
exit()
return header, data
[ドキュメント]
def read_data(infile, xmin = None, xmax = None):
if '.TXT' in infile.upper():
header, data = read_csv(infile, '\t', xmin, xmax)
return header, -np.array(data[0]), np.array(data[2])
else:
header, data = read_csv(infile, ',', xmin, xmax)
return header, np.array(data[0]), np.array(data[1])
[ドキュメント]
def Gaussian(x, x0, whalf):
#A = 1/whalf * sqrt(ln2 / pi)
A = 0.469718639 / whalf
#a = whalf / sqrt(ln2)
a = whalf / 0.832554611
X = (x - x0) / a
return A * exp(-X*X)
[ドキュメント]
def Hij(xstep, Wa, Grange, i, j):
# ixG0 = int(Grange * Wa / xstep + 1.0001)
# if abs(j - i) > ixG0:
# return 0.0
return Gaussian((j - i) * xstep, 0.0, Wa)
[ドキュメント]
def make_wf(Wa, Grange, xstep):
ixG0 = int(Grange * Wa / xstep + 1.0001)
ixGmax = 2 * ixG0
nxGmax = ixG0 + 1
xG0 = ixG0 * xstep
xG = []
yG = []
for i in range(ixGmax+1):
x = i * xstep
xG.append(x)
yG.append(Gaussian(x, xG0, Wa))
SG = 0.0
for i in range(len(yG)-1):
SG += (yG[i] + yG[i+1]) / 2.0 * (xG[i+1] - xG[i])
for i in range(ixGmax+1):
yG[i] /= SG
print(" Range: {} in width".format(Grange * Wa))
print(" i range: {} - {} at center {}".format(0,ixGmax, xG0))
print(" ixGmax = ", ixGmax)
print(" SG = ", SG)
return xG, yG
[ドキュメント]
def convolve(xraw, yraw, ywf, **kwargs):
yconv = np.convolve(yraw, ywf, **kwargs) / sum(ywf)
n_new = len(yconv)
dn = n_new - len(yraw)
if dn > 0:
offset = int(dn / 2)
xmin = xraw[0]
xstep = xraw[1] - xmin
xmin_new = xmin - offset * xstep
print("convolve: the length of the output data has been changed "
+ "from {} to {}".format(len(yraw), n_new))
print(" Add offset = {}".format(offset))
print(" xmin changes: {} => {}".format(xmin, xmin_new))
x = np.array([xmin_new + i * xstep for i in range(n_new)])
return x, yconv
return xraw, yconv
[ドキュメント]
def extend_smooth(x, y, nzero, nlin, xstep = 0.0):
xmin = x[0]
xstep = x[1] - x[0]
xmin_new = x[0] - nzero * xstep
n_new = int(nzero + 1.0e-5) + len(x)
nlin = int(nlin + 1.0e-5)
nzero = int(nzero + 1.0e-5)
print("extend_smooth:")
print(" Add {} zeros at top of the data".format(nzero))
print(" xmin changes: {} => {}".format(xmin, xmin_new))
print(" Reshape {} input data with a linear filter".format(nlin))
xx = np.array([xmin_new + i * xstep for i in range(n_new)])
yy = np.zeros(n_new)
for i in range(nlin):
k = i / (nlin - 1)
yy[i+nzero] = k * y[i]
for i in range(len(x) - nlin):
yy[i+nzero+nlin] = y[i+nzero]
return xx, yy
[ドキュメント]
def SmoothingBySimpleAverage(y, n):
n2 = int(n / 2);
ndata = len(y);
ys = []
for i in range(0, ndata):
c = 0;
ys.append(0.0);
for k in range(i - n2, i + n2 + 1):
if k < 0 or k >= ndata:
continue
ys[i] += y[k]
c += 1
if c > 0:
ys[i] /= c;
else:
ys[i] = y[i]
return ys;
[ドキュメント]
def SmoothingByPolynomialFit(y, n):
m = int(n / 2);
W23 = (4.0 * m * m - 1.0) * (2.0 * m + 3.0) / 3.0;
w23j = [0.0]*n
for j in range(-m, m+1):
w23j[j + m] = (3.0 * m * (m+1.0) - 1.0 - 5.0 * j * j) / W23
ndata = len(y)
ys = []
for i in range(0, ndata):
c = 0.0;
ys.append(0.0);
for j in range(-m, m+1):
k = i + j
if k < 0 or k >= ndata:
continue
ys[i] += w23j[j+m] * y[k]
c += w23j[j+m]
if c > 0:
ys[i] /= c
else:
ys[i] = y[i]
return ys;
[ドキュメント]
def deconvolute_fft(xRaw, yRaw, xG, yG):
k = sum(yG)
print("Deconvolution by FFT")
n = len(xRaw)
nlog = int(log(n) / log(2) + 1.0 - 1.0e-5)
nfft = pow(2, nlog)
print(" Data number is changed from {} to 2^{} = {} for FFT".format(n, nlog, nfft))
xmin = xRaw[0]
xstep = xRaw[1] - xmin
xRawFFT = [xmin + i * xstep for i in range(nfft)]
yRawFFT = np.insert(yRaw, len(yRaw), np.zeros(nfft - n))
# filterの中心位置の原点からのずれによって、iFFT後の原点がずれる
yGFFT = np.insert(yG, len(yG), np.zeros(nfft - len(yG)))
xminG = xmin + len(xG) / 2 * xstep
print(" xmin: ", xmin, xminG)
xGFFT = [xminG + i * xstep for i in range(nfft)]
# nadd = int((nfft - len(yG)) / 2)
# yGFFT = np.insert(yG, len(yG), np.zeros(nadd))
# yGFFT = np.insert(yGFFT, 0, np.zeros(nfft - len(yGFFT)))
yRawFFTed = np.fft.fft(yRawFFT)
yGFFTed = np.fft.fft(yGFFT)
ycFFTed = yRawFFTed / yGFFTed
ydeconv = np.fft.ifft(ycFFTed)
ydeconv = [float(ydeconv[i]) for i in range(len(ydeconv))]
print("")
return xGFFT, ydeconv, xRawFFT, yRawFFT, xRawFFT, yGFFT
[ドキュメント]
def deconvolute_deconvolve(xRaw, yRaw, xG, yG):
k = sum(yG)
print("Deconvolution by scipy.signal.deconvove")
print("")
IDec, remainder = scipy.signal.deconvolve(yRaw, yG)
IDec *= k
ndata = len(xRaw)
nGhalf = int(len(xG) / 2)
return xRaw[nGhalf:ndata-nGhalf], IDec
[ドキュメント]
def deconvolute_jacobi(xRaw, yRaw, xG, yG, fig, ax):
global Wa, Grange
k = sum(yG)
print("Deconvolution by Jacobi method")
print("")
xstep = xRaw[1] - xRaw[0]
xgmin = min(xRaw)
xgmax = max(xRaw)
n = len(xRaw)
Sg = np.zeros(n)
for i in range(n):
for j in range(n):
Sg[i] += Hij(xstep, Wa, Grange, i, j)
print("Filter area w.r.t. i: Sg=", Sg[int(n/2)])
ymax = max([abs(yRaw[i]) for i in range(n)])
y = yRaw.copy()
yPrev = yRaw.copy()
for it in range(nmaxiter):
Hx = np.zeros(n)
print("iter=", it)
for i in range(n):
for j in range(n):
Hx[i] += Hij(xstep, Wa, Grange, i, j) * yPrev[j]
h = Hij(xstep, Wa, Grange, i, i)
y[i] = yPrev[i] + (yRaw[i] - Hx[i] / Sg[i]) / h * dump
# y = SmoothingBySimpleAverage(y, nsmooth)
y = SmoothingByPolynomialFit(y, nsmooth)
if zero_correction:
for i in range(n):
if y[i] < 0.0:
y[i] = 0.0
ax[0].cla()
data1 = ax[0].plot(xRaw, yRaw, label = 'raw/initial')
data1 = ax[0].plot(xRaw, y, label = 'updated')
# data4 = ax[2].plot(xG, yG, label = 'filter')
ax[0].set_xlim([xgmin, xgmax])
ygmax = max([max(xRaw), max(y)])
ax[0].set_ylim([0.0, ygmax])
# ax[1].set_xlim([xgmin, xgmax])
# ax[1].set_ylim([0.0, max(yRaw)])
# ax[2].set_xlim([xgmin, xgmax])
# ax[2].set_ylim([0.0, max(yG)])
ax[0].legend()
# ax[2].legend()
plt.tight_layout()
plt.pause(sleeptime)
max_err = max([abs(y[i] - yPrev[i]) for i in range(n)])
rel_err = max_err / ymax
print(" max error: ", max_err, " relative error: ", rel_err, " eps=", eps)
if max_err / ymax < eps:
print("Converged at max_err={} ({} relative) < {}".format(max_err, rel_err, eps))
break
yPrev = y.copy()
else:
print("Not converged")
return xRaw, y
[ドキュメント]
def deconvolute_gauss_seidel(xRaw, yRaw, xG, yG, fig, ax):
global Wa, Grange
k = sum(yG)
print("Deconvolution by Jacobi method")
print("")
xstep = xRaw[1] - xRaw[0]
xgmin = min(xRaw)
xgmax = max(xRaw)
n = len(xRaw)
Sg = np.zeros(n)
for i in range(n):
for j in range(n):
Sg[i] += Hij(xstep, Wa, Grange, i, j)
print("Filter area w.r.t. i: Sg=", Sg[int(n/2)])
ymax = max([abs(yRaw[i]) for i in range(n)])
y = yRaw.copy()
yPrev = yRaw.copy()
for it in range(nmaxiter):
Hx = np.zeros(n)
print("iter=", it)
for i in range(n):
for j in range(i):
Hx[i] += Hij(xstep, Wa, Grange, i, j) * y[j]
for j in range(i, n):
Hx[i] += Hij(xstep, Wa, Grange, i, j) * yPrev[j]
h = Hij(xstep, Wa, Grange, i, i)
y[i] = yPrev[i] + (yRaw[i] - Hx[i] / Sg[i]) / h * dump
# y = SmoothingBySimpleAverage(y, nsmooth)
y = SmoothingByPolynomialFit(y, nsmooth)
if zero_correction:
for i in range(n):
if y[i] < 0.0:
y[i] = 0.0
ax[0].cla()
data1 = ax[0].plot(xRaw, yRaw, label = 'raw/initial')
data1 = ax[0].plot(xRaw, y, label = 'updated')
# data4 = ax[2].plot(xG, yG, label = 'filter')
ax[0].set_xlim([xgmin, xgmax])
ygmax = max([max(xRaw), max(y)])
ax[0].set_ylim([0.0, ygmax])
# ax[1].set_xlim([xgmin, xgmax])
# ax[1].set_ylim([0.0, max(yRaw)])
# ax[2].set_xlim([xgmin, xgmax])
# ax[2].set_ylim([0.0, max(yG)])
ax[0].legend()
# ax[2].legend()
plt.tight_layout()
plt.pause(sleeptime)
max_err = max([abs(y[i] - yPrev[i]) for i in range(n)])
rel_err = max_err / ymax
print(" max error: ", max_err, " relative error: ", rel_err, " eps=", eps)
if max_err / ymax < eps:
print("Converged at max_err={} ({} relative) < {}".format(max_err, rel_err, eps))
break
yPrev = y.copy()
else:
print("Not converged")
return xRaw, y
#======================
# main
#======================
[ドキュメント]
def main():
argv = sys.argv
scriptname = argv[0]
if len(argv) == 1:
usage()
exit()
if len(argv) >= 2:
infile = argv[1]
if len(argv) >= 3:
mode = argv[2]
if mode == 'convolve' or mode == 'fft':
if len(argv) >= 4:
convmode = argv[3]
if len(argv) >= 5:
smoothmode = argv[4]
if len(argv) >= 6:
xmin = float(argv[5])
if len(argv) >= 7:
xmax = float(argv[6])
if len(argv) >= 8:
Wa = float(argv[7])
if len(argv) >= 9:
Grange = float(argv[8])
if len(argv) >= 10:
kzero = float(argv[9])
if len(argv) >= 11:
klin = float(argv[10])
elif mode == 'gs' or mode == 'jacobi':
convmode = ''
smoothmode = ''
if len(argv) >= 4:
xmin = float(argv[3])
if len(argv) >= 5:
xmax = float(argv[4])
if len(argv) >= 6:
Wa = float(argv[5])
if len(argv) >= 7:
dump = float(argv[6])
if len(argv) >= 8:
nmaxiter = int(argv[7])
if len(argv) >= 9:
eps = float(argv[8])
if len(argv) >= 10:
nsmooth = int(argv[9])
if len(argv) >= 11:
zero_correction = int(argv[10])
else:
print("")
print("Error: Invalid mode=[{}]".format(mode))
usage()
exit()
header, ext = os.path.splitext(infile)
outcsvfile = header + '-deconvoluted.csv'
print("infile : ", infile)
print("outfile : ", outcsvfile)
print("mode : ", mode)
if mode == 'convolve' or mode == 'fft':
print("For mode = 'convolve' or 'fft':")
print(" convmode: ", convmode)
print(" x range : ", xmin, xmax)
if mode == 'gs' or mode == 'jacobi':
print("For mode = 'gs' or 'jacobi':")
print(" dump=", dump)
print(" nmaxiter=", nmaxiter)
print(" eps=", eps)
print(" nsmooth=", nsmooth)
print(" zero correction=", zero_correction)
print("")
header, xin, yin = read_data(infile, xmin, xmax)
nindata = len(xin)
xinmin = xin[0]
xinstep = xin[1] - xin[0]
xinmax = xinmin + xinstep * (nindata - 1)
print("Input data")
print(" ndata = ", nindata)
print(" x range: {} - {} at {} step".format(xinmin, xinmax, xinstep))
print("Smooth mode: ", smoothmode)
print("")
xRaw = xin
yRaw = yin
xstep = xinstep
print("Filter: Wa = {} Grange = {}".format(Wa, Grange))
xG, yG = make_wf(Wa, Grange, xstep)
print("")
fig = plt.figure(figsize = (15,7))
ax = []
ax.append(fig.add_subplot(3, 1, 1))
ax.append(fig.add_subplot(3, 1, 2))
ax.append(fig.add_subplot(3, 1, 3))
# make data to be deconvolved
nw = int(len(xG) / 2)
if 'average' in smoothmode:
yRaw = SmoothingBySimpleAverage(yRaw, 31)
if 'extend' in smoothmode:
xRaw, yRaw = extend_smooth(xRaw, yRaw, nw*kzero, nw*klin, xstep)
if 'convolve' in smoothmode:
xconv, yconv = convolve(xRaw, yRaw, yG, mode = convmode)
else:
xconv, yconv = xRaw, yRaw
# deconvolution
if mode == 'fft':
xDec, yDec, xRawFFT, yRawFFT, xGFFT, yGFFT = deconvolute_fft(xconv, yconv, xG, yG)
xconv = xRawFFT
yconv = yRawFFT
datafft = ax[2].plot(xGFFT, yGFFT, label = 'WF for FFT')
elif mode == 'jacobi':
xDec, yDec = deconvolute_jacobi(xconv, yconv, xG, yG, fig, ax)
yG = [Gaussian(xRaw[i], 0.0, Wa) for i in range(len(xRaw))]
datafft = ax[2].plot(xRaw, yG, label = 'WF for Jacobi method')
elif mode == 'gs':
xDec, yDec = deconvolute_gauss_seidel(xconv, yconv, xG, yG, fig, ax)
yG = [Gaussian(xRaw[i], 0.0, Wa) for i in range(len(xRaw))]
datafft = ax[2].plot(xRaw, yG, label = 'WF for Gauss-Seidel method')
else:
xDec, yDec = deconvolute_deconvolve(xconv, yconv, xG, yG)
datafft = ax[2].plot(xG, yG, label = 'WF for convolve')
xgmin = min([min(xconv), min(xDec)])
xgmax = max([max(xconv), max(xDec)])
ax[0].cla()
data1 = ax[0].plot(xconv, yconv, label = 'input(convoluted)')
data3 = ax[0].plot(xDec, yDec, label = 'deconvoluted')
data1 = ax[1].plot(xRaw, yRaw, label = 'input(raw)')
data2 = ax[1].plot(xconv, yconv, label = 'input(convoluted)')
ax[0].set_xlim([xgmin, xgmax])
ax[0].set_ylim([0.0, max([max(yRaw), max(yDec)])])
ax[1].set_xlim([xgmin, xgmax])
ax[1].set_ylim([0.0, max(yRaw)])
ax[2].set_xlim([xgmin, xgmax])
ax[2].set_ylim([0.0, max(yG)])
# ax[1].set_ylim([0.0, max(yRaw)])
ax[0].legend()
ax[1].legend()
ax[2].legend()
plt.tight_layout()
plt.pause(0.001)
print("")
print("Save deconvoluted data to [{}]".format(outcsvfile))
savecsv(outcsvfile, ['x', 'y(input)', 'y(deconvoluted)'], [xconv, yconv, yDec])
print("Press ENTER to exit>>", end = '')
input()
if __name__ == '__main__':
usage()
main()