forked from KatherLab/preProcessing
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathNormalize.py
85 lines (66 loc) · 3 KB
/
Normalize.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""
@author: Narmin Ghaffari Laleh <narminghaffari23@gmail.com> - Nov 2020
"""
##############################################################################
from multiprocessing.dummy import Pool as ThreadPool
import stainNorm_Macenko
import multiprocessing
import os
import cv2
import numpy as np
global inputPath
global outputPath
global normalizer
##############################################################################
def Normalize_Main(item):
outputPathRoot = os.path.join(outputPath, item)
inputPathRoot = os.path.join(inputPath, item)
inputPathRootContent = os.listdir(inputPathRoot)
print()
if not len(inputPathRootContent) == 0:
if not os.path.exists(outputPathRoot):
os.mkdir(outputPathRoot)
temp = os.path.join(inputPath, item)
tempContent = os.listdir(temp)
tempContent = [i for i in tempContent if i.endswith('.jpg')]
for tempItem in tempContent:
img = cv2.imread(os.path.join(inputPathRoot, tempItem))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
edge = cv2.Canny(img, 40, 100)
edge = edge / np.max(edge)
edge = (np.sum(np.sum(edge)) / (img.shape[0] *img.shape[1])) * 100
print(edge)
if edge > 2:
try:
nor_img = normalizer.transform(img)
cv2.imwrite(os.path.join(outputPathRoot, tempItem), cv2.cvtColor(nor_img, cv2.COLOR_RGB2BGR))
except:
print('Failed to normalize the tile {}.'.format(tempItem))
##############################################################################
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
###############################################################################
def Normalization(inputPath, outputPath, sampleImagePath, num_threads = 8):
inputPathContent = os.listdir(inputPath)
normPathContent = os.listdir(outputPath)
remainlList = []
for i in inputPathContent:
if not i in normPathContent:
remainlList.append(i)
inputPathContent = [i for i in remainlList if not i.endswith('.bat')]
inputPathContent = [i for i in inputPathContent if not i.endswith('.txt')]
target = cv2.imread(sampleImagePath)
target = cv2.cvtColor(target, cv2.COLOR_BGR2RGB)
normalizer = stainNorm_Macenko.Normalizer()
normalizer.fit(target)
pool = ThreadPool(num_threads)
pool.map(Normalize_Main, inputPathContent)
pool.close()
pool.join()
###############################################################################
inputPath = r""
outputPath = r""
sampleImagePath = r""
Normalization(inputPath, outputPath,sampleImagePath, num_threads = 2)