Skip to content

Commit

Permalink
feat: improve clustering by using affinitypropogation to cluster nois…
Browse files Browse the repository at this point in the history
…e and breakup large clusters. then use cluster summary cosine distance to merge very similar clusters.
  • Loading branch information
nsantacruz committed May 30, 2024
1 parent 3957aeb commit 4893fd2
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 14 deletions.
94 changes: 83 additions & 11 deletions app/util/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from tqdm import tqdm
from hdbscan import HDBSCAN
from sklearn.metrics import silhouette_score, pairwise_distances
from sklearn.cluster import KMeans
from sklearn.cluster import KMeans, AffinityPropagation
import random
from dataclasses import dataclass
from numpy import ndarray
Expand Down Expand Up @@ -35,9 +35,23 @@ class Cluster:
items: list[AbstractClusterItem]
summary: str = None

def merge(self, other: 'Cluster') -> 'Cluster':
return Cluster(self.label, self.embeddings + other.embeddings, self.items + other.items, self.summary)

def __len__(self):
return len(self.items)

def __eq__(self, other):
if not isinstance(other, Cluster):
return False
return hash(self) == hash(other)

def __ne__(self, other):
return not self.__eq__(other)

def __hash__(self):
return hash((self.label, len(self), self.summary))


class AbstractClusterer(ABC):

Expand Down Expand Up @@ -138,31 +152,66 @@ def make_kmeans_algo_with_optimal_silhouette_score(embeddings: list[np.ndarray])
return KMeans(n_clusters=n_clusters, n_init='auto', random_state=RANDOM_SEED)


class StandardClusterer(AbstractClusterer):
class SklearnClusterer(AbstractClusterer):

def __init__(self, embedding_fn: Callable[[str], ndarray],
get_cluster_model: Callable[[list[ndarray]], Union[KMeans, HDBSCAN]],
get_cluster_labels: Callable[[list[ndarray]], ndarray],
get_cluster_summary: Callable[[list[str]], str] = None,
verbose: bool = True):
verbose: bool = True, breakup_large_clusters: bool = True):
"""
:param embedding_fn:
:param get_cluster_model:
:param verbose:
"""
super().__init__(embedding_fn, get_cluster_summary, verbose)
self.get_cluster_model = get_cluster_model
self._get_cluster_labels = get_cluster_labels
self._breakup_large_clusters = breakup_large_clusters


def clone(self, **kwargs) -> 'SklearnClusterer':
"""
Return new object with all the same data except modifications specified in kwargs
"""
init_params = {}
for key, value in self.__dict__.items():
if key.startswith("_"):
key = key[1:]
init_params[key] = value
return self.__class__(**{**init_params, **kwargs})

def _get_large_clusters(self, clusters: list[Cluster]) -> list[Cluster]:
from statistics import mean, stdev
large_clusters = []
for cluster in clusters:
other_cluster_lens = [len(c) for c in clusters if c != cluster]
if len(cluster) > (mean(other_cluster_lens) + 3*stdev(other_cluster_lens)):
large_clusters.append(cluster)
return large_clusters

def _recluster_large_clusters(self, clusters: list[Cluster]) -> list[Cluster]:
large_clusters = set(self._get_large_clusters(clusters))
other_clusters = [c for c in clusters if c not in large_clusters]
affinity_clusterer = self.clone(
get_cluster_labels=lambda x: AffinityPropagation(damping=0.7).fit(x).predict(x), breakup_large_clusters=False
)
items_to_recluster = reduce(lambda x, y: x + y.items, large_clusters, [])
reclustered_clusters = affinity_clusterer.cluster_items(items_to_recluster)
print("RECLUSTERED LEN", len(reclustered_clusters))
return other_clusters + reclustered_clusters

def cluster_items(self, items: list[AbstractClusterItem]) -> list[Cluster]:
"""
:param items: Generic list of items to cluster
:return: list of Cluster objects
"""
embeddings = self.embed_parallel(items, lambda x: x.get_str_to_embed(), max_workers=40, desc="embedding items for clustering")
cluster_results = self.get_cluster_model(embeddings).fit(embeddings)
clusters, noise_items, noise_embeddings = self._build_clusters_from_cluster_results(cluster_results.labels_, embeddings, items)
labels = self._get_cluster_labels(embeddings)
clusters, noise_items, noise_embeddings = self._build_clusters_from_cluster_results(labels, embeddings, items)
if self._breakup_large_clusters:
clusters = self._recluster_large_clusters(clusters)
if len(noise_items) > 0:
noise_results = make_kmeans_algo_with_optimal_silhouette_score(noise_embeddings).fit(noise_embeddings)
noise_clusters, _, _ = self._build_clusters_from_cluster_results(noise_results.labels_, noise_embeddings, noise_items)
noise_labels = AffinityPropagation(damping=0.7).fit(noise_embeddings).predict(noise_embeddings)
noise_clusters, _, _ = self._build_clusters_from_cluster_results(noise_labels, noise_embeddings, noise_items)
if self._verbose:
print("LEN NOISE_CLUSTERS", len(noise_clusters))
else:
Expand All @@ -183,14 +232,36 @@ def _embed_cluster_summaries(self, summarized_clusters: list[Cluster]):
summarized_clusters, lambda x: x.summary, max_workers=40, desc="embedding cluster summaries to score"
)

def _collapse_similar_clusters(self, clusters: list[Cluster]) -> list[Cluster]:
embeddings = self._embed_cluster_summaries(clusters)
distances = pairwise_distances(embeddings, metric='cosine')
threshold = 0.3
num_clusters = len(clusters)

# Keep track of which clusters have been merged
merged = [False] * num_clusters
new_clusters = []

for i in range(num_clusters):
if not merged[i]:
merged[i] = True
merged_cluster = clusters[i]
for j in range(i + 1, num_clusters):
if not merged[j] and distances[i, j] < threshold:
merged[j] = True
print(f"MERGING {round(distances[i, j], 4)}\n- ({len(merged_cluster)}){merged_cluster.summary}\n- ({len(clusters[j])}){clusters[j].summary}")
merged_cluster = merged_cluster.merge(clusters[j])
new_clusters.append(merged_cluster)
return new_clusters

def _calculate_clustering_score(self, summarized_clusters: list[Cluster], verbose=True) -> float:
embeddings = self._embed_cluster_summaries(summarized_clusters)
distances = pairwise_distances(embeddings, metric='cosine')
np.fill_diagonal(distances, np.inf)
min_distances = np.min(distances, axis=1)
num_clusters = len(summarized_clusters)
avg_min_distance = sum(min_distances)/len(min_distances)
max_clusters = 5*self.IDEAL_NUM_CLUSTERS # used to ensure closeness_to_ideal_score is bounded
max_clusters = 2*self.IDEAL_NUM_CLUSTERS # used to ensure closeness_to_ideal_score is bounded
closeness_to_ideal_score = (1 - (min(abs(num_clusters - self.IDEAL_NUM_CLUSTERS), max_clusters)/max_clusters)) * 0.7
clustering_score = closeness_to_ideal_score + avg_min_distance
return clustering_score
Expand All @@ -201,11 +272,12 @@ def cluster_items(self, items: list[AbstractClusterItem]) -> list[Cluster]:
for clusterer in self._clusterers:
curr_clusters = clusterer.cluster_items(items)
summarized_clusters = clusterer.summarize_clusters(curr_clusters)
summarized_clusters = self._collapse_similar_clusters(summarized_clusters)
clustering_score = self._calculate_clustering_score(summarized_clusters)
print("CLUSTER SCORE: ", clustering_score)
if clustering_score > highest_clustering_score:
highest_clustering_score = clustering_score
best_clusters = curr_clusters
best_clusters = summarized_clusters
return best_clusters

def cluster_and_summarize(self, items: list[AbstractClusterItem]) -> list[Cluster]:
Expand Down
14 changes: 11 additions & 3 deletions experiments/topic_source_curation/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
from basic_langchain.schema import HumanMessage, SystemMessage
import random
from hdbscan import HDBSCAN
from sklearn.cluster import AffinityPropagation
from sefaria_llm_interface.topic_prompt import TopicPromptSource
from sefaria_llm_interface.common.topic import Topic
from basic_langchain.embeddings import VoyageAIEmbeddings, OpenAIEmbeddings
from util.pipeline import Artifact
from util.general import get_by_xml_tag
from util.cluster import Cluster, OptimizingClusterer, StandardClusterer, AbstractClusterItem
from util.cluster import Cluster, OptimizingClusterer, SklearnClusterer, AbstractClusterItem
from experiments.topic_source_curation.common import get_topic_str_for_prompts
from experiments.topic_source_curation.summarized_source import SummarizedSource
import numpy as np
Expand Down Expand Up @@ -83,7 +84,14 @@ def _cluster_sources(sources: list[SummarizedSource], topic) -> list[Cluster]:
clusterers = []
for i in range(len(HDBSCAN_PARAM_OPTS['min_samples'])):
hdbscan_params = _get_ith_hdbscan_params(i)
clusterers.append(StandardClusterer(embed_text_openai, lambda x: HDBSCAN(**hdbscan_params),
partial(_get_cluster_summary_based_on_topic, topic_desc)))
temp_clusterer = SklearnClusterer(embed_text_openai,
lambda x: HDBSCAN(**hdbscan_params).fit(x).labels_,
partial(_get_cluster_summary_based_on_topic, topic_desc))
clusterers.append(temp_clusterer)

# temp_clusterer = SklearnClusterer(embed_text_openai,
# lambda x: AffinityPropagation(damping=0.7).fit(x).predict(x),
# partial(_get_cluster_summary_based_on_topic, topic_desc))
# clusterers.append(temp_clusterer)
clusterer_optimizer = OptimizingClusterer(embed_text_openai, clusterers)
return clusterer_optimizer.cluster_and_summarize(sources)

0 comments on commit 4893fd2

Please sign in to comment.