-
Notifications
You must be signed in to change notification settings - Fork 52
/
Copy pathmann_kendall.py
249 lines (216 loc) · 10.4 KB
/
mann_kendall.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 29 09:16:06 2015
@author: Michael Schramm
Modified on August 5th 2020
by : Philippe Ostiguy
"""
from __future__ import division
import numpy as np
from scipy.stats import norm
from manip_data import ManipData as md
from initialize import Initialize
from init_operations import InitOp as io
import copy
class MannKendall(Initialize):
"""
Mann-Kendall is a non-parametric test to determine if there is a trend in a time-series
"""
def __init__(self,series_,self_,alpha=0.01,iteration=True):
super().__init__()
super().__call__()
new_obj = copy.deepcopy(self_)
self.__dict__.update(new_obj.__dict__)
del self_, new_obj
io.init_series(self)
self.alpha=alpha
self.first_iteration=iteration
self.nb_sign=0
self.sous_series =md.sous_series_(series_,self.nb_data)
self.series_mk = series_
def mk(self):
"""
I'm not the original writer of this function, it comes from github :
https://github.com/mps9506/Mann-Kendall-Trend/blob/master/mk_test.py
The goal here is to calculate the Mann Kendall value at data point, so to
save time, we just substract the first value and add the last value when
we go to a new data point.
This function is derived from code originally posted by Sat Kumar Tomer
(satkumartomer@gmail.com)
See also: http://vsp.pnnl.gov/help/Vsample/Design_Trend_Mann_Kendall.htm
The purpose of the Mann-Kendall (MK) test (Mann 1945, Kendall 1975, Gilbert
1987) is to statistically assess if there is a monotonic upward or downward
trend of the variable of interest over time. A monotonic upward (downward)
trend means that the variable consistently increases (decreases) through
time, but the trend may or may not be linear. The MK test can be used in
place of a parametric linear regression analysis, which can be used to test
if the slope of the estimated linear regression line is different from
zero. The regression analysis requires that the residuals from the fitted
regression line be normally distributed; an assumption not required by the
MK test, that is, the MK test is a non-parametric (distribution-free) test.
Hirsch, Slack and Smith (1982, page 107) indicate that the MK test is best
viewed as an exploratory analysis and is most appropriately used to
identify stations where changes are significant or of large magnitude and
to quantify these findings.
By default, it is a two-side test
Input:
x: a vector of data
alpha: significance level (0.01 default)
Output:
trend: tells the trend (increasing, decreasing or no trend)
h: True (if trend is present) or False (if trend is absence)
p: p value of the significance test
z: normalized test statistics
Return value : -1 if there is a negative trend (at the significance level)
+1 if there is positive trend (at the significance level)
"""
sous_series_ = self.sous_series.loc[:,self.default_data]
n = len(sous_series_)
# calculate positive and negative sign
if self.first_iteration:
for k in range(n-1):
for j in range(k+1, n):
self.nb_sign += np.sign(sous_series_.values[j] - sous_series_.values[k])
# if we iterate through time, we use previous calculation and add new value and substract old value
else:
for k in range(n-1):
self.nb_sign += np.sign(sous_series_.values[n-1] - sous_series_.values[k])
self.sous_series= md.sous_series_(self.series_mk,self.nb_data,point_data=self.point_data-1)
sous_series_= self.sous_series.loc[:,self.default_data]
n = len(sous_series_)
for k in range(n-1):
self.nb_sign -= np.sign(sous_series_.values[k+1] - sous_series_.values[0])
self.first_iteration = False
# calculate the unique data
unique_x, tp = np.unique(sous_series_.values, return_counts=True)
g = len(unique_x)
# calculate the var(s)
if n == g: # there is no tie
var_s = (n*(n-1)*(2*n+5))/18
else: # there are some ties in data
var_s = (n*(n-1)*(2*n+5) - np.sum(tp*(tp-1)*(2*tp+5)))/18
if self.nb_sign > 0:
z = (self.nb_sign - 1)/np.sqrt(var_s)
elif self.nb_sign < 0:
z = (self.nb_sign + 1)/np.sqrt(var_s)
else: # self.nb_sign == 0:
z = 0
# calculate the p_value
p = 2*(1-norm.cdf(abs(z))) # two tail test
h = abs(z) > norm.ppf(1-self.alpha/2)
if (z < 0) and h:
trend = -1
elif (z > 0) and h:
trend = 1
else:
trend = 0
# return +1 if there a positive trend, -1 if there a negative trend and 0 if none.
return trend
def check_num_samples(beta, delta, std_dev, alpha=0.05, n=4, num_iter=1000,
tol=1e-6, num_cycles=10000, m=5):
"""
This function is an implementation of the "Calculation of Number of Samples
Required to Detect a Trend" section written by Sat Kumar Tomer
(satkumartomer@gmail.com) which can be found at:
http://vsp.pnnl.gov/help/Vsample/Design_Trend_Mann_Kendall.htm
As stated on the webpage in the URL above the method uses a Monte-Carlo
simulation to determine the required number of points in time, n, to take a
measurement in order to detect a linear trend for specified small
probabilities that the MK test will make decision errors. If a non-linear
trend is actually present, then the value of n computed by VSP is only an
approximation to the correct n. If non-detects are expected in the
resulting data, then the value of n computed by VSP is only an
approximation to the correct n, and this approximation will tend to be less
accurate as the number of non-detects increases.
Input:
beta: probability of falsely accepting the null hypothesis
delta: change per sample period, i.e., the change that occurs between
two adjacent sampling times
std_dev: standard deviation of the sample points.
alpha: significance level (0.05 default)
n: initial number of sample points (4 default).
num_iter: number of iterations of the Monte-Carlo simulation (1000
default).
tol: tolerance level to decide if the predicted probability is close
enough to the required statistical power value (1e-6 default).
num_cycles: Total number of cycles of the simulation. This is to ensure
that the simulation does finish regardless of convergence
or not (10000 default).
m: if the tolerance is too small then the simulation could continue to
cycle through the same sample numbers over and over. This parameter
determines how many cycles to look back. If the same number of
samples was been determined m cycles ago then the simulation will
stop.
Examples
--------
num_samples = check_num_samples(0.2, 1, 0.1)
"""
# Initialize the parameters
power = 1.0 - beta
P_d = 0.0
cycle_num = 0
min_diff_P_d_and_power = abs(P_d - power)
best_P_d = P_d
max_n = n
min_n = n
max_n_cycle = 1
min_n_cycle = 1
# Print information for user
print("Delta (gradient): {}".format(delta))
print("Standard deviation: {}".format(std_dev))
print("Statistical power: {}".format(power))
# Compute an estimate of probability of detecting a trend if the estimate
# Is not close enough to the specified statistical power value or if the
# number of iterations exceeds the number of defined cycles.
while abs(P_d - power) > tol and cycle_num < num_cycles:
cycle_num += 1
print("Cycle Number: {}".format(cycle_num))
count_of_trend_detections = 0
# Perform MK test for random sample.
for i in xrange(num_iter):
r = np.random.normal(loc=0.0, scale=std_dev, size=n)
x = r + delta * np.arange(n)
trend, h, p, z = mk_test(x, alpha)
if h:
count_of_trend_detections += 1
P_d = float(count_of_trend_detections) / num_iter
# Determine if P_d is close to the power value.
if abs(P_d - power) < tol:
print("P_d: {}".format(P_d))
print("{} samples are required".format(n))
return n
# Determine if the calculated probability is closest to the statistical
# power.
if min_diff_P_d_and_power > abs(P_d - power):
min_diff_P_d_and_power = abs(P_d - power)
best_P_d = P_d
# Update max or min n.
if n > max_n and abs(best_P_d - P_d) < tol:
max_n = n
max_n_cycle = cycle_num
elif n < min_n and abs(best_P_d - P_d) < tol:
min_n = n
min_n_cycle = cycle_num
# In case the tolerance is too small we'll stop the cycling when the
# number of cycles, n, is cycling between the same values.
elif (abs(max_n - n) == 0 and
cycle_num - max_n_cycle >= m or
abs(min_n - n) == 0 and
cycle_num - min_n_cycle >= m):
print("Number of samples required has converged.")
print("P_d: {}".format(P_d))
print("Approximately {} samples are required".format(n))
return n
# Determine whether to increase or decrease the number of samples.
if P_d < power:
n += 1
print("P_d: {}".format(P_d))
print("Increasing n to {}".format(n))
print("")
else:
n -= 1
print("P_d: {}".format(P_d))
print("Decreasing n to {}".format(n))
print("")
if n == 0:
raise ValueError("Number of samples = 0. This should not happen.")