-
Notifications
You must be signed in to change notification settings - Fork 1
/
nsc.py
348 lines (303 loc) · 8.96 KB
/
nsc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
"""Nearest subclass classifier (Cor J. Veenman, Marcel J.T. Reinders)
implementation essay"""
import sys, os
from math import sqrt as _sqrt, floor as _floor
from random import sample as _sample
if sys.version_info[0:2] < (2, 4):
from sets import Set as set, ImmutableSet as frozenset
dim=0 ### external programs should set this
k=3
q=1
eMax=20 ### 100 is suggested by the authors of NSC
noChangeMax=10
welt={}
### keys are class labels, values a set of points of that label
### this is necesasry due to the pre-supervised nature of the algorithm
rank_list={}
### keys are points, value the rank list for the given point
class punkt:
"""defines a point in n-dimensions Euclidian space, contains:
features, klasse."""
def __init__(self, features, klasse):
"""initializes the point"""
self.features=tuple(features)
self.klasse=klasse
def __eq__(self, other):
"""determines if two instances of point are identical.
two points are different if are different either the features or
the class label."""
if isinstance(other, punkt):
for i in xrange(dim):
if self.features[i] != other.features[i]:
return False
if self.klasse == other.klasse:
return True
return False
def __ne__(self, other):
"""determines if two instances of point are different"""
return not(__eq__(self, other))
def __hash__(self):
"""hashes the point (needed to inserting it in sets or dictionaries)"""
return hash(self.klasse)^hash(self.features)
def __repr__(self):
"""representation of the point"""
representation=' '.join(['%.8f' % (i) for i in self.features])
return '%s\t%s' % (self.klasse.__str__(), representation)
__str__=__repr__
def distance(first, second):
"""returns the Euclidian distance between two points"""
dist=0
for i in xrange(dim):
dist+=(first.features[i]-second.features[i])**2
return _sqrt(dist)
class kluster:
"""a cluster of points of the same class, contains:
points, IB, ib, OB, ob, mean, variance, klasse.
each and every operation on the cluster's content maintains the cluster's
consistency"""
def __init__(self, p=None):
"""initializes the cluster with a point or, if p is None, as void"""
self.points=set()
self.IB=set()
self.OB=set()
if p == None:
self.flush()
return
self.points.add(p)
self.mean=punkt(p.features, p.klasse)
self.variance=0.0
self.klasse=p.klasse
self.ib=0
self.IB.add(p)
self.ob=0
self.updOB()
def flush(self):
"""deletes all points of the cluster"""
self.points.clear()
self.variance=0.0
self.klasse=None
self.mean=punkt(tuple([None]*dim), None) ### the punkt object HAS to
self.IB.clear() ### be ALWAYS a tuple() of
self.ib=0 ### the RIGHT length
self.OB.clear()
self.ob=0
def __repr__(self):
"""representation of the cluster"""
return 'len: %d, sigma: %.2f, mean: %s' % (len(self.points), self.variance, self.mean)
__str__=__repr__
def isVoid(self):
"""if a cluster is void or not"""
if len(self.points) == 0:
return True
return False
def updMean(self):
"""updates the cluster's mean object.
this should be always run before updating the variance."""
self.mean.features=[0]*dim ### now is a list() of zeros
for i in xrange(dim):
for p in self.points:
self.mean.features[i]+=(p.features[i])
self.mean.features[i]=self.mean.features[i].__float__()/len(self.points)
self.mean.features=tuple(self.mean.features) ### again a tuple()
def updVariance(self):
"""updates cluster's variance.
requires that the mean object is consistent."""
self.variance=0.0
for p in self.points:
for i in xrange(dim):
self.variance+=(p.features[i]-self.mean.features[i])**2
self.variance=self.variance.__float__()/len(self.points)
def updIB(self):
"""updates the inner border of the cluster.
for each point inside the cluster scans the related reversed rank list
for internal points."""
self.IB.clear()
for p in self.points:
limit=0
if sys.version_info[0:2] >= (2, 4):
for elem in reversed(rank_list[p]):
if limit < q and elem[1] in self.points:
self.IB.add(elem[1])
limit+=1
else:
for i in range(len(rank_list[p]))[::-1]:
if limit < q and rank_list[p][i][1] in self.points:
self.IB.add(rank_list[p][i][1])
limit+=1
self.ib=_floor(_sqrt(len(self.IB))).__int__()
def updOB(self):
"""updates the outer border of the cluster.
for each point inside the cluster scans the related rank list for
external points"""
self.OB.clear()
outer_space=welt[self.klasse].difference(self.points)
for p in self.points:
limit=0
for elem in rank_list[p]:
if limit < k and elem[1] in outer_space:
self.OB.add(elem[1])
limit+=1
self.ob=_floor(_sqrt(len(self.OB))).__int__()
def add(self, p):
"""adds a point to the cluster"""
if len(self.points) == 0:
self.__init__(p)
return
self.points.add(p)
self.updMean()
self.updVariance()
self.updIB()
self.updOB()
def multiadd(self, other):
"""adds multiple points to the cluster.
useful to reduce the number of operations needed to maintan the
consistency.
note that this does not update cluster's label."""
self.points.update(other.points)
self.updMean()
self.updVariance()
self.updIB()
self.updOB()
def rem(self, p):
"""removes a point from the cluster"""
if len(self.points) == 1:
self.flush()
return
self.points.remove(p)
self.updMean()
self.updVariance()
self.updIB()
self.updOB()
def randomSubset(border, cardinality):
"""returns an arbitrary set of the given cardinality that is subset of
border"""
#Y=set()
#X=set(border) ### this can avoid strange things in the
#limit=min(cardinality, len(X)) ### case that random goes crazy
#for i in xrange(limit):
# Y.add(X.pop()) ### set.pop() is not truly random
#return Y
return set(_sample(border, cardinality)) ### this IS random!
def furthest(Y, mean):
"""the point of Y that is furthest from the point mean"""
rl=list() ### cannot use the rank list for mean because mean
for p in Y: ### is not necessarily part of the training set
t=distance(mean, p), p
rl.append(t)
return max(rl)[1]
def jointVariance(Ca, Cb):
"""joint variance of Ca and Cb"""
Cu=kluster()
Cu.points.update(Ca.points.union(Cb.points))
Cu.updMean()
Cu.updVariance()
return Cu.variance
def gain(Ca, Cb, x):
"""the variance gain obtained if moving x from Cb to Ca"""
first=kluster()
first.points.update(Ca.points)
first.updMean()
first.updVariance()
second=kluster()
second.points.update(Cb.points)
second.updMean()
second.updVariance()
gab=0
gab+=first.variance
gab+=second.variance
first.points.add(x)
first.updMean()
first.updVariance()
gab-=first.variance
second.points.remove(x)
if len(second.points) != 0:
second.updMean()
second.updVariance()
gab-=second.variance
return gab
def computeRLs(kl):
"""computes rank lists for each point in welt[kl]"""
for star in welt[kl]:
if not rank_list.has_key(star):
rank_list.setdefault(star, list())
for figurant in welt[kl]:
if figurant == star:
continue
rank_list[star].append((distance(star, figurant), figurant))
rank_list[star].sort()
def mvc(kl, sigmaQuadMax):
"""Maximum variance cluster algorithm"""
prototypes=set()
for xi in welt[kl]:
prototypes.add(kluster(xi))
epoch=lastChange=0
while epoch-lastChange < noChangeMax:
epoch+=1
for Ca in prototypes:
if Ca.isVoid():
continue
### ISOLATION PASS ###
if Ca.variance > sigmaQuadMax and epoch < eMax:
Y=randomSubset(Ca.IB, Ca.ib)
x=furthest(Y, Ca.mean)
#print '%s: ISOLATION, %d' % (Ca.klasse, epoch)
Ca.rem(x)
for Cm in prototypes: ### searching for a void cluster
if Cm == Ca:
continue
if Cm.isVoid():
Cm.add(x)
break
continue ### if the isolation pass is taken
### choose another cluster
### UNION PASS ###
if Ca.variance <= sigmaQuadMax:
sMin=os.sys.maxint
Cm=None
for Cb in prototypes:
if Cb == Ca:
continue
if len(Ca.OB.intersection(Cb.points)) != 0:
jv=jointVariance(Ca, Cb)
if (jv <= sigmaQuadMax) and (jv < sMin):
sMin=jv
Cm=Cb
if Cm != None:
#print '%s: UNION, %d' % (Ca.klasse, epoch)
Ca.multiadd(Cm)
Cm.flush()
lastChange=epoch
continue ### if the union pass is taken
### choose another cluster
### PERTURBATION PASS ###
Y=randomSubset(Ca.OB, Ca.ob)
gMax=-os.sys.maxint-1
Cm=None
xMax=None
for x in Y:
for Cb in prototypes:
if Cb == Ca:
continue
if x in Cb.points:
g=gain(Ca, Cb, x)
if g > gMax:
gMax=g
Cm=Cb
xMax=x
if gMax > 0:
#print '%s: PERTURBATION, %d' % (Ca.klasse, epoch)
Ca.add(xMax)
Cm.rem(xMax)
lastChange=epoch
#print 'last run at %d' % (epoch)
return prototypes
def nsc(prototypes, unclassified):
"""Nearest subclass classifier"""
classified=set()
for p in unclassified:
relative_distances=list()
for proto in prototypes:
relative_distances.append((distance(p, proto), proto.klasse))
p.klasse=min(relative_distances)[1]
classified.add(p)
return classified