From 3957aeb818a1a514a546ae730e4e2022325801c8 Mon Sep 17 00:00:00 2001 From: nsantacruz Date: Wed, 29 May 2024 16:36:49 +0300 Subject: [PATCH] feat: refactor and improve clustering optimization --- app/util/cluster.py | 207 ++++++++---------- experiments/topic_source_curation/choose.py | 3 + experiments/topic_source_curation/cluster.py | 30 ++- experiments/topic_source_curation/curator.py | 10 +- .../summarized_source.py | 2 +- 5 files changed, 124 insertions(+), 128 deletions(-) diff --git a/app/util/cluster.py b/app/util/cluster.py index 444c0bf..fae8cab 100644 --- a/app/util/cluster.py +++ b/app/util/cluster.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Any, Callable, Union +from typing import Any, Callable, Union, TypeVar from functools import reduce, partial from tqdm import tqdm from hdbscan import HDBSCAN @@ -13,9 +13,11 @@ from util.general import get_by_xml_tag, run_parallel import numpy as np +T = TypeVar('T') RANDOM_SEED = 567454 random.seed(RANDOM_SEED) + class AbstractClusterItem(ABC): @abstractmethod def get_str_to_embed(self) -> str: @@ -25,6 +27,7 @@ def get_str_to_embed(self) -> str: def get_str_to_summarize(self) -> str: pass + @dataclass class Cluster: label: int @@ -38,13 +41,71 @@ def __len__(self): class AbstractClusterer(ABC): + def __init__(self, embedding_fn: Callable[[str], ndarray], get_cluster_summary: Callable[[list[str]], str] = None, verbose=True): + """ + :param embedding_fn: + :param get_cluster_summary: function that takes a list of strings to summarize (sampled from cluster items) and returns a summary of the strings. + """ + self._embedding_fn = embedding_fn + self._get_cluster_summary = get_cluster_summary + self._verbose = verbose + + def embed_parallel(self, items: list[T], key: Callable[[T], ndarray], **kwargs): + return run_parallel([key(item) for item in items], self._embedding_fn, disable=not self._verbose, **kwargs) + @abstractmethod - def cluster_items(self, items: list[AbstractClusterItem], cluster_noise: bool = False) -> list[Cluster]: + def cluster_items(self, items: list[AbstractClusterItem]) -> list[Cluster]: pass - @abstractmethod def cluster_and_summarize(self, items: list[AbstractClusterItem]) -> list[Cluster]: - pass + clusters = self.cluster_items(items) + return self.summarize_clusters(clusters) + + @staticmethod + def _default_get_cluster_summary(strs_to_summarize: list[str]) -> str: + llm = ChatOpenAI("gpt-4o", 0) + system = SystemMessage(content="Given a few ideas (wrapped in " + "XML tags) output a summary of the" + "ideas. Wrap the output in tags. Summary" + "should be no more than 10 words.") + human = HumanMessage(content=f"{''.join(strs_to_summarize)}") + response = llm([system, human]) + return get_by_xml_tag(response.content, "summary") + + def summarize_cluster(self, cluster: Cluster, sample_size=5) -> Cluster: + """ + :param cluster: Cluster to summarize + :param sample_size: Maximum number of items to sample from a cluster. If len(cluster) < sample_size, then all items in the cluster will be chosen. + :return: the same cluster object with the `summary` attribute set. + """ + get_cluster_summary = self._get_cluster_summary or self._default_get_cluster_summary + sample = random.sample(cluster.items, min(len(cluster), sample_size)) + strs_to_summarize = [item.get_str_to_summarize() for item in sample] + if len(cluster) == 1: + cluster.summary = cluster.items[0].get_str_to_summarize() + return cluster + cluster.summary = get_cluster_summary(strs_to_summarize) + return cluster + + def summarize_clusters(self, clusters: list[Cluster], **kwargs) -> list[Cluster]: + return run_parallel(clusters, partial(self.summarize_cluster, **kwargs), + max_workers=25, desc='summarize source clusters') + + @staticmethod + def _build_clusters_from_cluster_results(labels, embeddings, items): + clusters = [] + noise_items = [] + noise_embeddings = [] + for label in np.unique(labels): + indices = np.where(labels == label)[0] + curr_embeddings = [embeddings[j] for j in indices] + curr_items = [items[j] for j in indices] + if label == -1: + noise_items += curr_items + noise_embeddings += curr_embeddings + continue + clusters += [Cluster(label, curr_embeddings, curr_items)] + return clusters, noise_items, noise_embeddings def _guess_optimal_kmeans_clustering(embeddings, verbose=True): @@ -79,121 +140,47 @@ def make_kmeans_algo_with_optimal_silhouette_score(embeddings: list[np.ndarray]) class StandardClusterer(AbstractClusterer): - def __init__(self, embedding_fn: Callable[[str], ndarray], - get_cluster_algo: Callable[[list[ndarray]], Union[KMeans, HDBSCAN]], + def __init__(self, embedding_fn: Callable[[str], ndarray], + get_cluster_model: Callable[[list[ndarray]], Union[KMeans, HDBSCAN]], get_cluster_summary: Callable[[list[str]], str] = None, verbose: bool = True): """ :param embedding_fn: - :param get_cluster_algo: - :param get_cluster_summary: function that takes a list of strings to summarize (sampled from cluster items) and returns a summary of the strings. + :param get_cluster_model: :param verbose: """ - self.embedding_fn = embedding_fn - self.get_cluster_algo = get_cluster_algo - self.verbose = verbose - self.get_cluster_summary = get_cluster_summary or self._default_get_cluster_summary - + super().__init__(embedding_fn, get_cluster_summary, verbose) + self.get_cluster_model = get_cluster_model - def clone(self, **kwargs) -> 'StandardClusterer': - """ - Return new object with all the same data except modifications specified in kwargs - """ - return self.__class__(**{**self.__dict__, **kwargs}) - - @staticmethod - def _default_get_cluster_summary(strs_to_summarize: list[str]) -> str: - llm = ChatOpenAI("gpt-4o", 0) - system = SystemMessage(content="Given a few ideas (wrapped in " - "XML tags) output a summary of the" - "ideas. Wrap the output in tags. Summary" - "should be no more than 10 words.") - human = HumanMessage(content=f"{''.join(strs_to_summarize)}") - response = llm([system, human]) - return get_by_xml_tag(response.content, "summary") - - - def summarize_cluster(self, cluster: Cluster, sample_size=5) -> Cluster: - """ - - :param cluster: Cluster to summarize - :param sample_size: Maximum number of items to sample from a cluster. If len(cluster) < sample_size, then all items in the cluster will be chosen. - :return: the same cluster object with the `summary` attribute set. - """ - sample = random.sample(cluster.items, min(len(cluster), sample_size)) - strs_to_summarize = [item.get_str_to_summarize() for item in sample] - if len(cluster) == 1: - cluster.summary = cluster.items[0].get_str_to_summarize() - return cluster - cluster.summary = self.get_cluster_summary(strs_to_summarize) - return cluster - - def summarize_clusters(self, clusters: list[Cluster], **kwargs) -> list[Cluster]: - return run_parallel(clusters, partial(self.summarize_cluster, **kwargs), - max_workers=25, desc='summarize source clusters', disable=not self.verbose) - - def cluster_items(self, items: list[AbstractClusterItem], cluster_noise: bool = False) -> list[Cluster]: + def cluster_items(self, items: list[AbstractClusterItem]) -> list[Cluster]: """ :param items: Generic list of items to cluster - :param cluster_noise: :return: list of Cluster objects """ - embeddings = run_parallel([item.get_str_to_embed() for item in items], self.embedding_fn, max_workers=40, desc="embedding items for clustering", disable=not self.verbose) - cluster_results = self.get_cluster_algo(embeddings).fit(embeddings) - clusters, noise_items, noise_embeddings = self._build_clusters_from_cluster_results(cluster_results, embeddings, items) - if cluster_noise and len(noise_items) > 0: + embeddings = self.embed_parallel(items, lambda x: x.get_str_to_embed(), max_workers=40, desc="embedding items for clustering") + cluster_results = self.get_cluster_model(embeddings).fit(embeddings) + clusters, noise_items, noise_embeddings = self._build_clusters_from_cluster_results(cluster_results.labels_, embeddings, items) + if len(noise_items) > 0: noise_results = make_kmeans_algo_with_optimal_silhouette_score(noise_embeddings).fit(noise_embeddings) - noise_clusters, _, _ = self._build_clusters_from_cluster_results(noise_results, noise_embeddings, noise_items) - if self.verbose: + noise_clusters, _, _ = self._build_clusters_from_cluster_results(noise_results.labels_, noise_embeddings, noise_items) + if self._verbose: print("LEN NOISE_CLUSTERS", len(noise_clusters)) else: noise_clusters = [] return clusters + noise_clusters - def cluster_and_summarize(self, items: list[AbstractClusterItem], **kwargs) -> list[Cluster]: - clusters = self.cluster_items(items) - return self.summarize_clusters(clusters, **kwargs) - - - def _build_clusters_from_cluster_results(self, cluster_results: Union[KMeans, HDBSCAN], embeddings, items): - clusters = [] - noise_items = [] - noise_embeddings = [] - for label in set(cluster_results.labels_): - indices = np.where(cluster_results.labels_ == label)[0] - curr_embeddings = [embeddings[j] for j in indices] - curr_items = [items[j] for j in indices] - if label == -1: - noise_items += curr_items - noise_embeddings += curr_embeddings - if self.verbose: - print('noise cluster', len(curr_items)) - continue - clusters += [Cluster(label, curr_embeddings, curr_items)] - return clusters, noise_items, noise_embeddings - +class OptimizingClusterer(AbstractClusterer): -class HDBSCANOptimizerClusterer(AbstractClusterer): IDEAL_NUM_CLUSTERS = 20 - HDBSCAN_PARAM_OPTS = { - "min_samples": [1, 1], - "min_cluster_size": [2, 2], - "cluster_selection_method": ["eom", "leaf"], - "cluster_selection_epsilon": [0.65, 0.5], - } - - def __init__(self, clusterer: StandardClusterer, verbose=True): - # TODO param to avoid clustering noise cluster which may mess up optimizer - self.clusterer = clusterer - self.param_search_len = len(self.HDBSCAN_PARAM_OPTS["min_samples"]) - self.verbose = verbose + + def __init__(self, embedding_fn: Callable[[str], ndarray], clusterers: list[AbstractClusterer], verbose=True): + super().__init__(embedding_fn, verbose=verbose) + self._clusterers = clusterers def _embed_cluster_summaries(self, summarized_clusters: list[Cluster]): - return run_parallel( - [c.summary for c in summarized_clusters], - self.clusterer.embedding_fn, - max_workers=40, desc="embedding items for clustering", disable=not self.verbose + return self.embed_parallel( + summarized_clusters, lambda x: x.summary, max_workers=40, desc="embedding cluster summaries to score" ) def _calculate_clustering_score(self, summarized_clusters: list[Cluster], verbose=True) -> float: @@ -208,31 +195,19 @@ def _calculate_clustering_score(self, summarized_clusters: list[Cluster], verbos clustering_score = closeness_to_ideal_score + avg_min_distance return clustering_score - def _get_ith_hdbscan_params(self, i): - return reduce(lambda x, y: {**x, y[0]: y[1][i]}, self.HDBSCAN_PARAM_OPTS.items(), {}) - - def cluster_items(self, items: list[AbstractClusterItem], cluster_noise: bool = False) -> list[Cluster]: + def cluster_items(self, items: list[AbstractClusterItem]) -> list[Cluster]: best_clusters = None highest_clustering_score = 0 - for i in range(self.param_search_len): - curr_hdbscan_obj = HDBSCAN(**self._get_ith_hdbscan_params(i)) - curr_clusterer = self.clusterer.clone(get_cluster_algo=lambda x: curr_hdbscan_obj, verbose=False) - curr_clusters = curr_clusterer.cluster_items(items, cluster_noise=cluster_noise) - summarized_clusters = curr_clusterer.summarize_clusters(curr_clusters) + for clusterer in self._clusterers: + curr_clusters = clusterer.cluster_items(items) + summarized_clusters = clusterer.summarize_clusters(curr_clusters) clustering_score = self._calculate_clustering_score(summarized_clusters) + print("CLUSTER SCORE: ", clustering_score) if clustering_score > highest_clustering_score: highest_clustering_score = clustering_score best_clusters = curr_clusters - print("best hdbscan params", self._get_ith_hdbscan_params(i)) return best_clusters def cluster_and_summarize(self, items: list[AbstractClusterItem]) -> list[Cluster]: - clusters = self.cluster_items(items, cluster_noise=True) - summarized_clusters = self.clusterer.summarize_clusters(clusters) - if self.verbose: - print(f'---SUMMARIES--- ({len(summarized_clusters)})') - for cluster in summarized_clusters: - print('\t-', len(cluster.items), cluster.summary.strip()) - return summarized_clusters - - + clusters = self.cluster_items(items) + return self._clusterers[0].summarize_clusters(clusters) diff --git a/experiments/topic_source_curation/choose.py b/experiments/topic_source_curation/choose.py index c7eca77..5159f32 100644 --- a/experiments/topic_source_curation/choose.py +++ b/experiments/topic_source_curation/choose.py @@ -190,6 +190,7 @@ def choose_ideal_clusters(clusters: list[Cluster], max_clusters: int) -> list[Cl # sorted_clusters = _sort_clusters_by_instruction(clusters) return [c for c in clusters if len(clusters) > 1] + def sort_clusters(clusters: list[Cluster], topic:Topic, max_clusters: int) -> list[Cluster]: # print(f"Sorting {len(clusters)} clusters by interestingness...") # sorted_clusters = _sort_clusters_by_instruction(clusters, topic) @@ -244,11 +245,13 @@ def _get_highest_avg_pairwise_distance_indices(embeddings: np.ndarray) -> np.nda sorted_indices = np.argsort(avg_distances)[::-1] # Sort in descending order return sorted_indices + def _sort_by_highest_avg_pairwise_distance(items: list[T], key: Callable[[T], str], verbose=True) -> list[T]: embeddings = np.array(run_parallel([key(x) for x in items], embed_text_openai, max_workers=100, desc="Embedding summaries for interestingness sort", disable=not verbose)) sorted_indices = _get_highest_avg_pairwise_distance_indices(embeddings) return [items[i] for i in sorted_indices] + def get_gpt_compare(system_prompt, human_prompt_generator, llm): content_to_val = {"1":-1, "2":1, "0":0} def gpt_compare(a, b) -> int: diff --git a/experiments/topic_source_curation/cluster.py b/experiments/topic_source_curation/cluster.py index 6d7fe33..f2a2eec 100644 --- a/experiments/topic_source_curation/cluster.py +++ b/experiments/topic_source_curation/cluster.py @@ -1,5 +1,5 @@ from typing import Union -from functools import partial +from functools import partial, reduce from basic_langchain.chat_models import ChatOpenAI from basic_langchain.schema import HumanMessage, SystemMessage import random @@ -9,7 +9,7 @@ from basic_langchain.embeddings import VoyageAIEmbeddings, OpenAIEmbeddings from util.pipeline import Artifact from util.general import get_by_xml_tag -from util.cluster import Cluster, HDBSCANOptimizerClusterer, StandardClusterer, AbstractClusterItem +from util.cluster import Cluster, OptimizingClusterer, StandardClusterer, AbstractClusterItem from experiments.topic_source_curation.common import get_topic_str_for_prompts from experiments.topic_source_curation.summarized_source import SummarizedSource import numpy as np @@ -43,12 +43,15 @@ def get_clustered_sources(sources: list[TopicPromptSource]) -> list[Cluster]: """ return Artifact(sources).pipe(_cluster_sources, get_text_from_source).data + def embed_text_openai(text): return np.array(OpenAIEmbeddings(model="text-embedding-3-large").embed_query(text)) + def embed_text_voyageai(text): return np.array(VoyageAIEmbeddings(model="voyage-large-2-instruct").embed_query(text)) + def _get_cluster_summary_based_on_topic(topic_desc, strs_to_summarize): llm = ChatOpenAI("gpt-4o", 0) system = SystemMessage(content="You are a Jewish scholar familiar with Torah. Given a few ideas (wrapped in " @@ -62,10 +65,25 @@ def _get_cluster_summary_based_on_topic(topic_desc, strs_to_summarize): summary = response.content.replace('', '').replace('', '') return summary or 'N/A' + +HDBSCAN_PARAM_OPTS = { + "min_samples": [1, 1], + "min_cluster_size": [2, 2], + "cluster_selection_method": ["eom", "leaf"], + "cluster_selection_epsilon": [0.65, 0.5], +} + + +def _get_ith_hdbscan_params(i): + return reduce(lambda x, y: {**x, y[0]: y[1][i]}, HDBSCAN_PARAM_OPTS.items(), {}) + + def _cluster_sources(sources: list[SummarizedSource], topic) -> list[Cluster]: topic_desc = get_topic_str_for_prompts(topic, verbose=False) - # get_cluster_algo will be optimized by HDBSCANOptimizerClusterer - clusterer = StandardClusterer(embed_text_openai, lambda x: HDBSCAN(), - partial(_get_cluster_summary_based_on_topic, topic_desc)) - clusterer_optimizer = HDBSCANOptimizerClusterer(clusterer) + clusterers = [] + for i in range(len(HDBSCAN_PARAM_OPTS['min_samples'])): + hdbscan_params = _get_ith_hdbscan_params(i) + clusterers.append(StandardClusterer(embed_text_openai, lambda x: HDBSCAN(**hdbscan_params), + partial(_get_cluster_summary_based_on_topic, topic_desc))) + clusterer_optimizer = OptimizingClusterer(embed_text_openai, clusterers) return clusterer_optimizer.cluster_and_summarize(sources) diff --git a/experiments/topic_source_curation/curator.py b/experiments/topic_source_curation/curator.py index f1480f9..2c1fc08 100644 --- a/experiments/topic_source_curation/curator.py +++ b/experiments/topic_source_curation/curator.py @@ -43,7 +43,7 @@ def get_topics_to_curate(): def save_curation(data, topic: Topic) -> list[SummarizedSource]: sources, clusters = data - contexts = run_parallel(sources, partial(get_context_for_source, topic=topic, clusters=clusters), max_workers=40, desc="Get source context") + contexts = run_parallel(sources, partial(get_context_for_source, topic=topic, clusters=clusters), max_workers=20, desc="Get source context") out = [{ "ref": source.source.ref, "context": contexts[isource] @@ -59,16 +59,16 @@ def curate_topic(topic: Topic) -> list[TopicPromptSource]: # .pipe(save_sources, topic) .pipe(load_sources) .pipe(get_clustered_sources_based_on_summaries, topic) - # .pipe(save_clusters, topic) + .pipe(save_clusters, topic) # .pipe(load_clusters) - # .pipe(choose, topic) - # .pipe(save_curation, topic).data + .pipe(choose, topic) + .pipe(save_curation, topic).data ) if __name__ == '__main__': # topics = random.sample(get_topics_to_curate(), 50) - topics = [make_llm_topic(SefariaTopic.init(slug)) for slug in ['hagar']] + topics = [make_llm_topic(SefariaTopic.init(slug)) for slug in ['poverty']] for t in topics: print("CURATING", t.slug) curated_sources = curate_topic(t) diff --git a/experiments/topic_source_curation/summarized_source.py b/experiments/topic_source_curation/summarized_source.py index bcb6d4c..0d269a9 100644 --- a/experiments/topic_source_curation/summarized_source.py +++ b/experiments/topic_source_curation/summarized_source.py @@ -21,7 +21,7 @@ def serialize(self) -> dict: return serial def get_str_to_summarize(self) -> str: - return self.summary + return self.summary or 'N/A' def get_str_to_embed(self) -> str: return self.summary