-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcompute_icdl_opt_param.py
124 lines (97 loc) · 3.46 KB
/
compute_icdl_opt_param.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Find optimal ICDL parameters for each test case by optimizing over 768 different parameter combinations.
"""
import os
import sys
import glob
import copy
from socket import gethostname
from functools import partial
import numpy as np
from psutil import cpu_count
import sporco.fft
sporco.fft.pyfftw_threads = 1
from sporco.signal import gaussian
from sporco.util import idle_cpu_count, grid_search
from cdlpsf.cdl import PSFEstimator
from cdlpsf.util import sm_snr
# Define standard integer sampling grid -wp ... wp
wp = 7
# Subpixel estimation factor (common for all runs)
M = 5
# Common noise level
slct_noise = 1.0
# Evaluate PSF estimation performance for specified parameters
def psf_est_performance(prm, K, img, psfref):
sigma0, lmbdaX, lmbdaG, rhoX, LG = prm
g0 = gaussian((2*wp+1,) * 2, sigma0)
g0 /= np.linalg.norm(g0)
opt = PSFEstimator.Options(
{'Verbose': False, 'MaxMainIter': 100,
'XslvIter0': 10, 'GslvIter0': 10,
'Xslv': {'NonNegCoef': False, 'rho': rhoX},
'Gslv': {'L': LG}})
pe = PSFEstimator(img, g0, lmbdaX, lmbdaG, M, K, opt)
psfgrd = pe.solve()
return sm_snr(psfref, psfgrd)
# Algorithm parameter ranges
paramranges = {
'sigma0': np.array([0.5, 1.0, 1.5, 2.0]),
'lmbdaX': np.array([1e-3, 1e-2, 1e-1, 5e-1]),
'lmbdaG': np.array([1e-2, 5e-2, 1e-1, 5e-1]),
'rhoX': np.array([1e0, 1e1, 1e2]),
'LG': np.array([5e1, 1e2, 5e2, 1e3])
}
prmcmb = np.product([paramranges[v].size for v in paramranges])
print('Evaluating %d parameter combinations per input file' % prmcmb)
# Paths to data files
imgpath = 'data/simulated_images'
psfpath = 'data/reference_psfs'
rsltpath = 'data/icdl_results'
if not os.path.isdir(rsltpath):
os.makedirs(rsltpath)
rsltfile = os.path.join(rsltpath, 'icdl_opt_param_pps.npz')
# Number of processors to use for parameter evaluation
nproc = cpu_count(logical=False)
results = {}
imgfiles = sorted(glob.glob(os.path.join(imgpath, '*.npz')))
# Iterate over input image files
for imgfile in imgfiles:
# Load data from simulated image file
npz = np.load(imgfile)
img = npz['imgn']
noise = float(npz['noise'])
shape = str(npz['shape'])
pps = int(npz['pixperstar'])
# Skip input images for different noise levels
if noise != slct_noise:
continue
basename = os.path.basename(imgfile)[0:-4]
print('Computing parameters for %s' % imgfile)
# Rescale image range
imx = img.max()
img /= imx
# Select Lanczos kernel order depending on PSF shape
if shape == 'complex' or shape == 'narrow':
K = 5
else:
K = 10
# Load reference PSF
psffile = os.path.join(psfpath, '%s.npz' % shape)
npz = np.load(psffile, allow_pickle=True)
psfref = npz['refpsf'].item()[M]
# Evaluate performance over selected parameter ranges
perf = partial(psf_est_performance, K=K, img=img, psfref=psfref)
sprm, sfvl, fvmx, sidx = grid_search(
perf, (paramranges['sigma0'], paramranges['lmbdaX'],
paramranges['lmbdaG'], paramranges['rhoX'],
paramranges['LG']),
fmin=False, nproc=nproc)
# Add results for this image to results dict
results[basename] = {'shape': shape, 'pps': pps, 'noise': noise,
'K': K, 'sprm': sprm, 'sfvl': sfvl, 'fvmx': fvmx,
'sidx': sidx}
# Save data to NPZ file
np.savez(rsltfile, paramranges=paramranges, results=results)