-
Notifications
You must be signed in to change notification settings - Fork 0
/
running_cpd.py
142 lines (118 loc) · 6.45 KB
/
running_cpd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import time
import os
import numpy as np
import networkx as nx
import numba_cost_functions as my_numb
import result_related as my_res
import utils as my_ut
import rpy2_related as my_rpy2
from typing import List, Literal
from math import floor
from tqdm import tqdm
from scipy.linalg import eigh
### UTILS
#########
def init_pred_saving(pred_dir, name):
results = {}
if not os.path.exists(os.path.join(pred_dir, name)):
my_ut.create_parent_and_dump_json(pred_dir, name, results, indent=4)
return os.path.join(pred_dir, name)
### INITIALIZATION
##################
def init_station_normal_cost(signal, graph_laplacian_mat):
'''signal (array): of shape [n_samples, n_dim]'''
# computation of the graph fourier transform
_, eigvects = eigh(graph_laplacian_mat)
gft = signal @ eigvects # equals signal.dot(eigvects) = eigvects.T.dot(signal.T).T
gft_mean = np.mean(gft, axis=0)
# computation of the per-segment cost utils
gft_square_cumsum = np.concatenate([np.zeros((1, signal.shape[1])), np.cumsum((gft - gft_mean[None, :])**2, axis=0)], axis=0)
return gft_square_cumsum.astype(np.float64)
### CPD DYNAMIC PROGRAMMING SOLVER
##################################
def rglasso_cpd_dynprog(n_bkps:int, min_size:int, signal, pen_mult_coef, buffer_path):
# path_mat[n, K] avec n --> [y_0, ... y_{n-1}] (very important to understand indexing) , K : n_bkps
# sum_of_cost_mat[n, K]: best cost for signal until sample n with K bkps
# we fill-in the sum_of_cost_mat row by row, i.e for longer and longer signals (top to down), for higher and
# highr number of bkp (left to right), because in dynamic programming
# we need the results of the previous sub-problems before solving the current one
# initialization
n_samples = signal.shape[0]
path_mat = np.empty((n_samples+1, n_bkps+1), dtype=np.int32)
path_mat[:, 0] = 0
path_mat[0, :] = -1
sum_of_cost_mat = np.full((n_samples+1, n_bkps+1), fill_value=np.inf, dtype=np.float64)
sum_of_cost_mat[0, :] = 0
np.save(f"{buffer_path}.npy", signal)
r_signal = my_rpy2.load_r_signal(f"{buffer_path}.npy")
# pre-computation, to optimize jit processing
statio_segment_cost = np.full((n_samples+1, n_samples+1), fill_value=np.inf, dtype=np.float64)
for start in tqdm(range(0, n_samples-min_size+1), desc='Looping over the segments start in glasso'):
for end in range(start+min_size, n_samples+1):
statio_segment_cost[start, end] = my_rpy2.glasso_cost_func(start, end, r_signal, pen_mult_coef)
# forward computation
for end in range(min_size, n_samples+1):
sum_of_cost_mat[end, 0] = statio_segment_cost[0, end]
# consistent because our cost functions compute the costs over [start, end[
max_admissible_n_bkp = floor(end/min_size) - 1
for k_bkps in range(1, min(max_admissible_n_bkp+1, n_bkps+1)):
soc_optim = np.inf
soc_argmin = -1
# the following step corresponds to the last line of eq (21) P.19 Truong, Oudre, Vayatis
for mid in range(min_size*k_bkps, end - min_size + 1):
soc = sum_of_cost_mat[mid, k_bkps-1] + statio_segment_cost[mid, end]
if soc < soc_optim:
soc_argmin = mid
soc_optim = soc
sum_of_cost_mat[end, k_bkps] = soc_optim
path_mat[end, k_bkps] = soc_argmin
# backtracking
bkps = np.full((n_bkps+1), fill_value=n_samples)
for k_bkps in range(n_bkps, 0, -1):
bkps[k_bkps-1] = path_mat[bkps[k_bkps], k_bkps]
return bkps
### CPD TRIGGER
###############
def run_numba_statio_normal_cost_and_store_res(G: nx.Graph, signal: np.ndarray, gt_bkps: List[int], min_size: int, statio_json_path: str, exp_id: int):
# running CPD algorithm
t1 = time.perf_counter()
graph_lapl_mat = nx.laplacian_matrix(G).toarray().astype(np.float64)
gft_square_cumsum = init_station_normal_cost(signal, graph_lapl_mat)
statio_bkps = my_numb.numba_cpd_dynprog_statio_cost_optim(len(gt_bkps)-1, min_size, gft_square_cumsum)
statio_bkps = [int(bkp) for bkp in statio_bkps]
t2 = time.perf_counter()
# logging
res = my_res.update_pred_dic_with_one_exp(t1, t2, statio_bkps, gt_bkps, exp_id)
my_ut.load_and_write_json(statio_json_path, exp_id, my_ut.turn_all_list_of_dict_into_str(res), indent=4)
def run_numba_standard_mle_normal_cost_and_store_res(signal: np.ndarray, gt_bkps: List[int], min_size:int, normal_json_path: str, exp_id:int):
# running CPD algorithm
t1 = time.perf_counter()
normal_bkps = my_numb.numba_cpd_dynprog_mle_standard_cost_optim(len(gt_bkps) - 1, min_size, signal)
normal_bkps = [int(bkp) for bkp in normal_bkps]
t2 = time.perf_counter()
# logging
res = my_res.update_pred_dic_with_one_exp(t1, t2, normal_bkps, gt_bkps, exp_id)
my_ut.load_and_write_json(normal_json_path, exp_id, my_ut.turn_all_list_of_dict_into_str(res), indent=4)
def run_r_glasso_cpd_algo_and_store(signal, gt_bkps: List[int], glasso_json_path: str, exp_id: str, pen_mult_coef: float, buffer_path: str):
# running CPD algorithm
t1 = time.perf_counter()
glasso_bkps = rglasso_cpd_dynprog(n_bkps=len(gt_bkps)-1, min_size=signal.shape[1], signal=signal, pen_mult_coef=pen_mult_coef, buffer_path=buffer_path)
t2 = time.perf_counter()
glasso_bkps.sort()
glasso_bkps = [int(bkp) for bkp in glasso_bkps]
# logging
res = my_res.update_pred_dic_with_one_exp(t1, t2, glasso_bkps, gt_bkps, exp_id)
my_ut.load_and_write_json(glasso_json_path, exp_id, my_ut.turn_all_list_of_dict_into_str(res), indent=4)
def run_r_covcp_cpd_algo_and_store(signal_path, gt_bkps: List[int], covcp_json_path: dict, stable_set_length:int, min_seg_length:int, window_sizes, alpha, exp_id, nb_cores, r_covcp_seed):
# running CPD algorithm
t1 = time.perf_counter()
print('Running R covcp')
my_rpy2.init_r_core_management(nb_cores, r_covcp_seed)
r_signal = my_rpy2.load_r_signal(f"{signal_path}_signal.npy")
covcp_bkps = my_rpy2.detect_multiple_covcp_bkps(n_bkps=len(gt_bkps)-1, r_signal=r_signal, stable_set_length=stable_set_length, min_seg_length=min_seg_length, window_sizes=window_sizes, alpha=alpha, bkps=[])
t2 = time.perf_counter()
covcp_bkps.sort()
covcp_bkps = [int(bkp) for bkp in covcp_bkps] + [gt_bkps[-1]]
# logging
res = my_res.update_pred_dic_with_one_exp(t1, t2, covcp_bkps, gt_bkps, exp_id)
my_ut.load_and_write_json(covcp_json_path, exp_id, my_ut.turn_all_list_of_dict_into_str(res), indent=4)