diff --git a/.gitignore b/.gitignore index b6e4761..cfb2f71 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,9 @@ +ocr_output/ +array.pkl +grouped_clusters.json +clustering_output/ +output_images/ + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/pyproject.toml b/pyproject.toml index a7d2901..c6bcdef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ dependencies = [ "numpy", "pillow", "tensorflow", + "scikit-learn", ] [project.optional-dependencies] diff --git a/src/monocheck/clustering.py b/src/monocheck/clustering.py new file mode 100644 index 0000000..c5a1bda --- /dev/null +++ b/src/monocheck/clustering.py @@ -0,0 +1,67 @@ +import numpy as np + +from typing import List +from pathlib import Path +from PIL import Image + +from sklearn.cluster import KMeans + + +from reportlab.lib.pagesizes import letter +from reportlab.pdfgen import canvas + +def cluster(features: np.ndarray, no_of_clusters:int= 2): + kmeans = KMeans(n_clusters= no_of_clusters, random_state=22) + kmeans.fit(features) + + print("[SUCCESS]: Clustering succesfully done") + return kmeans.labels_ + +def group_clusters(files_paths: List[Path], cluster_labels:np.ndarray): + groups = {} + for file, cluster in zip(files_paths, cluster_labels): + file = str(file) + cluster = int(cluster) # int32 -> int conversion + if cluster not in groups.keys(): + groups[cluster] = [] + groups[cluster].append(file) + else: + groups[cluster].append(file) + return groups + +def view_clusters(grouped_clusters, save_path='output_images'): + """ save each clustering in different pdfs """ + clusters = list(grouped_clusters.keys()) + for cluster in clusters: + files = grouped_clusters[cluster] + images_per_page = 10 + page_width, page_height = letter # Default letter size + + """ Create a PDF canvas """ + c = canvas.Canvas(f'{save_path}/cluster_{cluster}.pdf', pagesize=letter) + + y_position = page_height - 72 # Initial top margin offset + for index, file in enumerate(files): + if index % images_per_page == 0 and index != 0: + c.showPage() # Add a new page if the current one is filled + y_position = page_height - 72 # Reset position at the top of a new page + + """ Open image and get its original size """ + img = Image.open(file) + img_width, img_height = img.size + + """ Check if the image width exceeds the page width """ + if img_width > page_width - 144: + scale_factor = (page_width - 144) / img_width + img_width *= scale_factor + img_height *= scale_factor + + """ Draw image on the canvas at original size """ + c.drawImage(file, 72, y_position - img_height, width=img_width, height=img_height) + + """ Update y_position for the next image """ + y_position -= (img_height + 10) # Move down by the image height plus some margin + + """save pdf""" + c.save() + print(f"Cluster {cluster} images saved to {save_path}/cluster_{cluster}.pdf") diff --git a/src/monocheck/dimension_reduction.py b/src/monocheck/dimension_reduction.py index e792b8e..d41d51a 100644 --- a/src/monocheck/dimension_reduction.py +++ b/src/monocheck/dimension_reduction.py @@ -7,5 +7,6 @@ def reduce_dimension(images_feature: np.ndarray, components:int = 100): pca.fit(images_feature) reduced_images_feature = pca.transform(images_feature) + print("[SUCCESS]: Image features dimensions reduction successfully done.") return reduced_images_feature diff --git a/src/monocheck/feature_extraction.py b/src/monocheck/feature_extraction.py index daf9070..dfb8f9c 100644 --- a/src/monocheck/feature_extraction.py +++ b/src/monocheck/feature_extraction.py @@ -4,9 +4,22 @@ def extract_features(images_input: np.ndarray, model:VGG16): - images_features = model.predict(images_input) - - return images_features + """ total number of images""" + num_images = images_input.shape[0] + """ batch size """ + batch_size = 1000 + all_features = [] + """ predict in batches """ + for start in range(0, num_images, batch_size): + end = min(start + batch_size, num_images) # Ensure the last batch is handled properly + batch_features = model.predict(images_input[start:end]) + all_features.append(batch_features) + print(f"[{end}/{num_images}] image features extraction done.") + + all_features = np.vstack(all_features) + + print("[SUCCESS]: Image features extraction done.") + return all_features diff --git a/src/monocheck/pipeline.py b/src/monocheck/pipeline.py index 9ec6b5d..f537d3b 100644 --- a/src/monocheck/pipeline.py +++ b/src/monocheck/pipeline.py @@ -1,4 +1,6 @@ import numpy as np +import json +import pickle from pathlib import Path from typing import List @@ -8,22 +10,44 @@ from monocheck.prepare import load_image from monocheck.feature_extraction import extract_features from monocheck.dimension_reduction import reduce_dimension +from monocheck.clustering import cluster, group_clusters, view_clusters -def pipeline(image_paths:List[Path]): +IMAGE_FEATURES_PICKLE = Path('array.pkl') + +def pipeline(image_paths:List[Path], output_file_path:Path=Path('grouped_clusters.json')): imgs_array = [load_image(image_path).squeeze(0) for image_path in image_paths] imgs_array = np.stack(imgs_array, axis=0) model = VGG16() model = Model(inputs = model.inputs, outputs = model.layers[-2].output) - imgs_features = extract_features(imgs_array, model) - imgs_features = imgs_features.reshape(-1,4096) + """ load image features pickle if exists """ + if not IMAGE_FEATURES_PICKLE.exists(): + imgs_features = extract_features(imgs_array, model) + imgs_features = imgs_features.reshape(-1,4096) + """ save the image features as pickle file """ + with open(IMAGE_FEATURES_PICKLE, 'wb') as file: + pickle.dump(imgs_features, file) + else: + with open(IMAGE_FEATURES_PICKLE, 'rb') as file: + imgs_features = pickle.load(file) + reduced_imgs_features = reduce_dimension(imgs_features) - return reduced_imgs_features + """ cluster the image features with kmeans """ + clustering_labels = cluster(reduced_imgs_features) + """ group images based on labels, with key: label and values: image paths""" + cluster_groups = group_clusters(image_paths, clustering_labels) + + """ save the result """ + with open(output_file_path, 'w') as file: + json.dump(cluster_groups, file, indent=4) + return cluster_groups + + if __name__ == "__main__": - imgs_path = [Path("image.jpg"), Path("image2.jpg")] - imgs_feat = pipeline(imgs_path) - print(imgs_feat) + imgs_path = list(Path("ocr_output").rglob("*.jpg")) + grouped_clusters = pipeline(imgs_path) + view_clusters(grouped_clusters)