diff --git a/app/util/cluster.py b/app/util/cluster.py index 30cf4e5..12edbe1 100644 --- a/app/util/cluster.py +++ b/app/util/cluster.py @@ -14,14 +14,6 @@ from util.general import get_by_xml_tag import numpy as np -def _topic_summarize_cluster_prompt(context, strs_to_summarize): - system = SystemMessage(content="You are a Jewish scholar familiar with Torah. Given a few ideas (wrapped in " - "XML tags) about a given topic (wrapped in XML tags) output a summary of the" - "ideas as they related to the topic. Wrap the output in tags. Summary" - "should be no more than 10 words.") - human = HumanMessage(content=f"{context}{''.join(strs_to_summarize)}") - return [system, human] - RANDOM_SEED = 567454 random.seed(RANDOM_SEED) @@ -56,8 +48,37 @@ def cluster_and_summarize(self, items: list[AbstractClusterItem]) -> list[Cluste pass +def _guess_optimal_kmeans_clustering(embeddings, verbose=True): + if len(embeddings) <= 1: + return len(embeddings) + + best_sil_coeff = -1 + best_num_clusters = 0 + MAX_MIN_CLUSTERS = 3 # the max start of the search for optimal cluster number. + n_cluster_start = min(len(embeddings), MAX_MIN_CLUSTERS) + n_cluster_end = len(embeddings)//2 + if n_cluster_end < (n_cluster_start + 1): + n_cluster_start = 2 + n_cluster_end = n_cluster_start + 1 + n_clusters = range(n_cluster_start, n_cluster_end) + for n_cluster in tqdm(n_clusters, total=len(n_clusters), desc='guess optimal clustering', disable=not verbose): + kmeans = KMeans(n_clusters=n_cluster, n_init='auto', random_state=RANDOM_SEED).fit(embeddings) + labels = kmeans.labels_ + sil_coeff = silhouette_score(embeddings, labels, metric='cosine', random_state=RANDOM_SEED) + if sil_coeff > best_sil_coeff: + best_sil_coeff = sil_coeff + best_num_clusters = n_cluster + if verbose: + print("Best silhouette score", round(best_sil_coeff, 4)) + return best_num_clusters + -class Clusterer(AbstractClusterer): +def make_kmeans_algo_with_optimal_silhouette_score(embeddings: list[np.ndarray]): + n_clusters = _guess_optimal_kmeans_clustering(embeddings) + return KMeans(n_clusters=n_clusters, n_init='auto', random_state=RANDOM_SEED) + + +class StandardClusterer(AbstractClusterer): def __init__(self, embedding_fn: Callable[[str], ndarray], get_cluster_algo: Callable[[list[ndarray]], Union[KMeans, HDBSCAN]], @@ -75,17 +96,12 @@ def __init__(self, embedding_fn: Callable[[str], ndarray], self.get_cluster_summary = get_cluster_summary or self._default_get_cluster_summary - def clone(self, **kwargs) -> 'Clusterer': + def clone(self, **kwargs) -> 'StandardClusterer': """ Return new object with all the same data except modifications specified in kwargs """ return self.__class__(**{**self.__dict__, **kwargs}) - @staticmethod - def _make_kmeans_algo(embeddings: list[np.ndarray]): - n_clusters = Clusterer._guess_optimal_kmeans_clustering(embeddings) - return KMeans(n_clusters=n_clusters, n_init='auto', random_state=RANDOM_SEED) - @staticmethod def _default_get_cluster_summary(strs_to_summarize: list[str]) -> str: llm = ChatOpenAI("gpt-4o", 0) @@ -98,7 +114,7 @@ def _default_get_cluster_summary(strs_to_summarize: list[str]) -> str: return get_by_xml_tag(response.content, "summary") - def _summarize_cluster(self, cluster: Cluster, sample_size=5) -> Cluster: + def summarize_cluster(self, cluster: Cluster, sample_size=5) -> Cluster: """ :param cluster: Cluster to summarize @@ -119,11 +135,11 @@ def cluster_items(self, items: list[AbstractClusterItem], cluster_noise: bool = :param cluster_noise: :return: list of Cluster objects """ - embeddings = run_parallel([item.get_str_to_embed() for item in items], self.embedding_fn, max_workers=40, desc="embedding items for clustering", disable=not verbose) - cluster_results = self.get_cluster_algo(embeddings, items).fit(embeddings) + embeddings = run_parallel([item.get_str_to_embed() for item in items], self.embedding_fn, max_workers=40, desc="embedding items for clustering", disable=not self.verbose) + cluster_results = self.get_cluster_algo(embeddings).fit(embeddings) clusters, noise_items, noise_embeddings = self._build_clusters_from_cluster_results(cluster_results, embeddings, items) if cluster_noise: - noise_results = self._make_kmeans_algo(noise_embeddings, items).fit(noise_embeddings) + noise_results = make_kmeans_algo_with_optimal_silhouette_score(noise_embeddings).fit(noise_embeddings) noise_clusters, _, _ = self._build_clusters_from_cluster_results(noise_results, noise_embeddings, noise_items) if self.verbose: print("LEN NOISE_CLUSTERS", len(noise_clusters)) @@ -132,7 +148,7 @@ def cluster_items(self, items: list[AbstractClusterItem], cluster_noise: bool = return clusters + noise_clusters def cluster_and_summarize(self, items: list[AbstractClusterItem], **kwargs) -> list[Cluster]: - return [self._summarize_cluster(c, **kwargs) for c in self.cluster_items(items)] + return [self.summarize_cluster(c, **kwargs) for c in self.cluster_items(items)] def _build_clusters_from_cluster_results(self, cluster_results: Union[KMeans, HDBSCAN], embeddings, items): @@ -152,30 +168,6 @@ def _build_clusters_from_cluster_results(self, cluster_results: Union[KMeans, HD clusters += [Cluster(label, curr_embeddings, curr_items)] return clusters, noise_items, noise_embeddings - @staticmethod - def _guess_optimal_kmeans_clustering(embeddings, verbose=True): - if len(embeddings) <= 1: - return len(embeddings) - - best_sil_coeff = -1 - best_num_clusters = 0 - MAX_MIN_CLUSTERS = 3 # the max start of the search for optimal cluster number. - n_cluster_start = min(len(embeddings), MAX_MIN_CLUSTERS) - n_cluster_end = len(embeddings)//2 - if n_cluster_end < (n_cluster_start + 1): - n_cluster_start = 2 - n_cluster_end = n_cluster_start + 1 - n_clusters = range(n_cluster_start, n_cluster_end) - for n_cluster in tqdm(n_clusters, total=len(n_clusters), desc='guess optimal clustering', disable=not verbose): - kmeans = KMeans(n_clusters=n_cluster, n_init='auto', random_state=RANDOM_SEED).fit(embeddings) - labels = kmeans.labels_ - sil_coeff = silhouette_score(embeddings, labels, metric='cosine', random_state=RANDOM_SEED) - if sil_coeff > best_sil_coeff: - best_sil_coeff = sil_coeff - best_num_clusters = n_cluster - if verbose: - print("Best silhouette score", round(best_sil_coeff, 4)) - return best_num_clusters class HDBSCANOptimizerClusterer(AbstractClusterer): @@ -187,7 +179,7 @@ class HDBSCANOptimizerClusterer(AbstractClusterer): "cluster_selection_epsilon": [0.65, 0.5], } - def __init__(self, clusterer: Clusterer, verbose=True): + def __init__(self, clusterer: StandardClusterer, verbose=True): # TODO param to avoid clustering noise cluster which may mess up optimizer self.clusterer = clusterer self.param_search_len = len(self.HDBSCAN_PARAM_OPTS["min_samples"]) @@ -201,7 +193,8 @@ def _embed_cluster_summaries(self, summarized_clusters: list[Cluster]): ) def _calculate_clustering_score(self, summarized_clusters: list[Cluster], verbose=True) -> float: - distances = pairwise_distances(self._embed_cluster_summaries(summarized_clusters), metric='cosine') + embeddings = self._embed_cluster_summaries(summarized_clusters) + distances = pairwise_distances(embeddings, metric='cosine') np.fill_diagonal(distances, np.inf) min_distances = np.min(distances, axis=1) num_clusters = len(summarized_clusters) @@ -220,7 +213,7 @@ def cluster_items(self, items: list[AbstractClusterItem], cluster_noise: bool = for i in range(self.param_search_len): curr_hdbscan_obj = HDBSCAN(**self._get_ith_hdbscan_params(i)) curr_clusterer = self.clusterer.clone(get_cluster_algo=lambda x: curr_hdbscan_obj) - curr_clusters = self.clusterer.cluster_items(items, cluster_noise=False) + curr_clusters = self.clusterer.cluster_items(items, cluster_noise=True) summarized_clusters = [self.clusterer.summarize_cluster(c) for c in curr_clusters] clustering_score = self._calculate_clustering_score(summarized_clusters) if clustering_score > highest_clustering_score: @@ -228,3 +221,13 @@ def cluster_items(self, items: list[AbstractClusterItem], cluster_noise: bool = best_clusterer = curr_clusterer return best_clusterer.cluster_items(items, cluster_noise=cluster_noise) + def cluster_and_summarize(self, items: list[AbstractClusterItem]) -> list[Cluster]: + clusters = self.cluster_items(items, cluster_noise=True) + summarized_clusters = [self.clusterer.summarize_cluster(c) for c in clusters] + if self.verbose: + print(f'---SUMMARIES--- ({len(summarized_clusters)})') + for cluster in summarized_clusters: + print('\t-', len(cluster.items), cluster.summary.strip()) + return summarized_clusters + + diff --git a/experiments/topic_source_curation_v2/cluster.py b/experiments/topic_source_curation_v2/cluster.py index f78b38f..414252e 100644 --- a/experiments/topic_source_curation_v2/cluster.py +++ b/experiments/topic_source_curation_v2/cluster.py @@ -1,23 +1,20 @@ -from typing import Any, Callable, Union +from typing import Callable, Union from functools import partial from tqdm import tqdm -from basic_langchain.embeddings import VoyageAIEmbeddings -from basic_langchain.embeddings import OpenAIEmbeddings -import hdbscan -from sklearn.metrics import silhouette_score, pairwise_distances -from sklearn.cluster import KMeans +from basic_langchain.chat_models import ChatOpenAI +from basic_langchain.schema import HumanMessage, SystemMessage import random -from concurrent.futures import ThreadPoolExecutor +from hdbscan import HDBSCAN from dataclasses import dataclass, asdict -from numpy import ndarray from topic_prompt.uniqueness_of_source import summarize_based_on_uniqueness from sefaria_llm_interface.topic_prompt import TopicPromptSource from sefaria_llm_interface.common.topic import Topic -from basic_langchain.chat_models import ChatOpenAI, ChatAnthropic -from basic_langchain.schema import HumanMessage, SystemMessage +from basic_langchain.chat_models import ChatAnthropic +from basic_langchain.embeddings import VoyageAIEmbeddings, OpenAIEmbeddings from experiments.topic_source_curation_v2.common import run_parallel -from util.general import get_by_xml_tag from util.pipeline import Artifact +from util.general import get_by_xml_tag +from util.cluster import Cluster, HDBSCANOptimizerClusterer, StandardClusterer, AbstractClusterItem import numpy as np RANDOM_SEED = 567454 @@ -25,17 +22,7 @@ @dataclass -class Cluster: - label: int - embeddings: list[ndarray] - items: list[Any] - summary: str = None - - def __len__(self): - return len(self.items) - -@dataclass -class SummarizedSource: +class SummarizedSource(AbstractClusterItem): source: TopicPromptSource summary: str @@ -49,6 +36,14 @@ def serialize(self) -> dict: serial['embedding'] = self.embedding.tolist() if self.embedding is not None else None return serial + def get_str_to_summarize(self) -> str: + text = self.source.text + return text.get('en', text.get('he', 'N/A')) + + def get_str_to_embed(self) -> str: + return self.summary + + def get_clustered_sources_based_on_summaries(sources: list[TopicPromptSource], topic: Topic) -> list[Cluster]: """ @@ -57,8 +52,7 @@ def get_clustered_sources_based_on_summaries(sources: list[TopicPromptSource], t :param topic: :return: """ - key = lambda source: source.summary - return Artifact(sources).pipe(_summarize_sources_parallel, topic).pipe(_cluster_sources, key, topic).pipe(summarize_source_clusters, topic).data + return Artifact(sources).pipe(_summarize_sources_parallel, topic).pipe(_cluster_sources, topic).data def get_text_from_source(source: Union[TopicPromptSource, SummarizedSource]) -> str: @@ -99,144 +93,26 @@ def embed_text_openai(text): def embed_text_voyageai(text): return np.array(VoyageAIEmbeddings(model="voyage-large-2-instruct").embed_query(text)) -def _make_kmeans_algo(embeddings: np.ndarray, items, topic): - n_clusters = _guess_optimal_clustering(embeddings) - return KMeans(n_clusters=n_clusters, n_init='auto', random_state=RANDOM_SEED) - -def _make_hdbscan_algo(embeddings: np.ndarray, items, topic, verbose=True): - IDEAL_NUM_CLUSTERS = 20 - hdbscan_params = { - "min_samples": [1, 1], - "min_cluster_size": [2, 2], - "cluster_selection_method": ["eom", "leaf"], - "cluster_selection_epsilon": [0.65, 0.5], - } - best_hdbscan_obj = None - highest_clustering_score = 0 - for i in range(len(hdbscan_params["min_samples"])): - curr_params = {} - for key, val in hdbscan_params.items(): - curr_params[key] = val[i] - curr_hdbscan_obj = hdbscan.HDBSCAN(**curr_params) - results = curr_hdbscan_obj.fit(embeddings) - curr_clusters, _, _ = _build_clusters_from_cluster_results(results, embeddings, items) - summarized_clusters = summarize_source_clusters(curr_clusters, topic, verbose=False) - cluster_summary_embeddings = run_parallel([c.summary for c in summarized_clusters], embed_text_openai, max_workers=40, desc="embedding items for clustering", disable=not verbose) - distances = pairwise_distances(cluster_summary_embeddings, metric='cosine') - np.fill_diagonal(distances, np.inf) - min_distances = np.min(distances, axis=1) - num_clusters = len(set(results.labels_)) - avg_min_distance = sum(min_distances)/len(min_distances) - closeness_to_ideal_score = (1 - (min(abs(num_clusters - IDEAL_NUM_CLUSTERS), 5*IDEAL_NUM_CLUSTERS)/(5*IDEAL_NUM_CLUSTERS))) * 0.7 - clustering_score = closeness_to_ideal_score + avg_min_distance - print("CURR PARAMS", curr_params, avg_min_distance, num_clusters, closeness_to_ideal_score, clustering_score) - if clustering_score > highest_clustering_score: - highest_clustering_score = clustering_score - print("IDEAL PARAMS", curr_params) - best_hdbscan_obj = curr_hdbscan_obj - - return best_hdbscan_obj - -def _cluster_sources(sources: list[SummarizedSource], key: Callable[[SummarizedSource], str], topic) -> list[Cluster]: - - openai_clusters = cluster_items(sources, key, embed_text_openai, _make_hdbscan_algo, topic) - # openai_clusters = cluster_items(sources, key, embed_text_openai, _make_kmeans_algo) - # voyageai_clusters = cluster_items(sources, key, embed_text_voyageai) - return openai_clusters - -def _get_cluster_size_by_source(clusters: list[Cluster]) -> dict: - pass - -def _get_topic_desc_str(topic: Topic) -> str: - topic_desc = f'{topic.title["en"]}' - if topic.description.get('en', False) and False: - topic_desc += f': {topic.description["en"]}' - return topic_desc - -def summarize_source_clusters(clusters: list[Cluster], topic, verbose=True) -> list[Cluster]: - topic_desc = _get_topic_desc_str(topic) - summarized_clusters = run_parallel(clusters, partial(summarize_cluster, context=topic_desc, key=get_text_from_source), max_workers=20, desc='summarize source clusters', disable=not verbose) - if verbose: - print('---SUMMARIES---') - for cluster in summarized_clusters: - print('\t-', len(cluster.items), cluster.summary.strip()) - return summarized_clusters - -def summarize_cluster(cluster: Cluster, context: str, key: Callable[[Any], str], sample_size=5) -> Cluster: - sample = random.sample(cluster.items, min(len(cluster), sample_size)) - strs_to_summarize = [key(item) for item in sample] - if len(cluster) == 1: - # assumes items have a summary field - cluster.summary = cluster.items[0].summary - return cluster +def _get_cluster_summary_based_on_topic(topic_desc, strs_to_summarize): llm = ChatOpenAI("gpt-4o", 0) system = SystemMessage(content="You are a Jewish scholar familiar with Torah. Given a few ideas (wrapped in " "XML tags) about a given topic (wrapped in XML tags) output a summary of the" "ideas as they related to the topic. Wrap the output in tags. Summary" "should be no more than 10 words.") - human = HumanMessage(content=f"{context}{''.join(strs_to_summarize)}") + human = HumanMessage(content=f"{topic_desc}{''.join(strs_to_summarize)}") response = llm([system, human]) - summary = get_by_xml_tag(response.content, "summary") - cluster.summary = summary - return cluster + return get_by_xml_tag(response.content, "summary") +def _cluster_sources(sources: list[SummarizedSource], topic) -> list[Cluster]: + topic_desc = _get_topic_desc_str(topic) + # get_cluster_algo will be optimized by HDBSCANOptimizerClusterer + clusterer = StandardClusterer(embed_text_openai, lambda x: HDBSCAN(), + partial(_get_cluster_summary_based_on_topic, topic_desc)) + clusterer_optimizer = HDBSCANOptimizerClusterer(clusterer) + return clusterer_optimizer.cluster_and_summarize(sources) -def cluster_items(items: list[Any], key: Callable[[Any], str], embedding_fn: Callable[[str], ndarray], - get_cluster_algo: Callable, topic, verbose=True) -> list[Cluster]: - """ - :param items: Generic list of items to cluster - :param key: function that takes an item from `items` and returns a string to pass to `embedding_fn` - :param embedding_fn: Given a str (from `key` function) return its embedding - :param get_cluster_algo: - :param verbose: - :return: list of Cluster objects - """ - embeddings = run_parallel([key(item) for item in items], embedding_fn, max_workers=40, desc="embedding items for clustering", disable=not verbose) - cluster_results = get_cluster_algo(embeddings, items, topic).fit(embeddings) - clusters, noise_items, noise_embeddings = _build_clusters_from_cluster_results(cluster_results, embeddings, items, verbose) - noise_results = _make_kmeans_algo(noise_embeddings, items, topic).fit(noise_embeddings) - noise_clusters, _, _ = _build_clusters_from_cluster_results(noise_results, noise_embeddings, noise_items, verbose) - if verbose: - print("LEN NOISE_CLUSTERS", len(noise_clusters)) - return clusters + noise_clusters - - -def _build_clusters_from_cluster_results(cluster_results, embeddings, items, verbose=True): - clusters = [] - noise_items = [] - noise_embeddings = [] - for label in set(cluster_results.labels_): - indices = np.where(cluster_results.labels_ == label)[0] - curr_embeddings = [embeddings[j] for j in indices] - curr_items = [items[j] for j in indices] - if label == -1: - noise_items += curr_items - noise_embeddings += curr_embeddings - if verbose: - print('noise cluster', len(curr_items)) - continue - clusters += [Cluster(label, curr_embeddings, curr_items)] - return clusters, noise_items, noise_embeddings - -def _guess_optimal_clustering(embeddings, verbose=True): - if len(embeddings) <= 1: - return len(embeddings) - - best_sil_coeff = -1 - best_num_clusters = 0 - MAX_MIN_CLUSTERS = 3 # the max start of the search for optimal cluster number. - n_cluster_start = min(len(embeddings), MAX_MIN_CLUSTERS) - n_cluster_end = len(embeddings)//2 - if n_cluster_end < (n_cluster_start + 1): - n_cluster_start = 2 - n_cluster_end = n_cluster_start + 1 - n_clusters = range(n_cluster_start, n_cluster_end) - for n_cluster in tqdm(n_clusters, total=len(n_clusters), desc='guess optimal clustering', disable=not verbose): - kmeans = KMeans(n_clusters=n_cluster, n_init='auto', random_state=RANDOM_SEED).fit(embeddings) - labels = kmeans.labels_ - sil_coeff = silhouette_score(embeddings, labels, metric='cosine', random_state=RANDOM_SEED) - if sil_coeff > best_sil_coeff: - best_sil_coeff = sil_coeff - best_num_clusters = n_cluster - print("Best silhouette score", round(best_sil_coeff, 4)) - return best_num_clusters +def _get_topic_desc_str(topic: Topic) -> str: + topic_desc = f'{topic.title["en"]}' + if topic.description.get('en', False) and False: + topic_desc += f': {topic.description["en"]}' + return topic_desc