Skip to content


feat: refactor and improve clustering optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
nsantacruz committed May 29, 2024
1 parent 9fffef9 commit 3957aeb
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 128 deletions.
207 changes: 91 additions & 116 deletions app/util/
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Callable, Union
from typing import Any, Callable, Union, TypeVar
from functools import reduce, partial
from tqdm import tqdm
from hdbscan import HDBSCAN
Expand All @@ -13,9 +13,11 @@
from util.general import get_by_xml_tag, run_parallel
import numpy as np

T = TypeVar('T')
RANDOM_SEED = 567454

class AbstractClusterItem(ABC):
def get_str_to_embed(self) -> str:
Expand All @@ -25,6 +27,7 @@ def get_str_to_embed(self) -> str:
def get_str_to_summarize(self) -> str:

class Cluster:
label: int
Expand All @@ -38,13 +41,71 @@ def __len__(self):

class AbstractClusterer(ABC):

def __init__(self, embedding_fn: Callable[[str], ndarray], get_cluster_summary: Callable[[list[str]], str] = None, verbose=True):
:param embedding_fn:
:param get_cluster_summary: function that takes a list of strings to summarize (sampled from cluster items) and returns a summary of the strings.
self._embedding_fn = embedding_fn
self._get_cluster_summary = get_cluster_summary
self._verbose = verbose

def embed_parallel(self, items: list[T], key: Callable[[T], ndarray], **kwargs):
return run_parallel([key(item) for item in items], self._embedding_fn, disable=not self._verbose, **kwargs)

def cluster_items(self, items: list[AbstractClusterItem], cluster_noise: bool = False) -> list[Cluster]:
def cluster_items(self, items: list[AbstractClusterItem]) -> list[Cluster]:

def cluster_and_summarize(self, items: list[AbstractClusterItem]) -> list[Cluster]:
clusters = self.cluster_items(items)
return self.summarize_clusters(clusters)

def _default_get_cluster_summary(strs_to_summarize: list[str]) -> str:
llm = ChatOpenAI("gpt-4o", 0)
system = SystemMessage(content="Given a few ideas (wrapped in <idea> "
"XML tags) output a summary of the"
"ideas. Wrap the output in <summary> tags. Summary"
"should be no more than 10 words.")
human = HumanMessage(content=f"<idea>{'</idea><idea>'.join(strs_to_summarize)}</idea>")
response = llm([system, human])
return get_by_xml_tag(response.content, "summary")

def summarize_cluster(self, cluster: Cluster, sample_size=5) -> Cluster:
:param cluster: Cluster to summarize
:param sample_size: Maximum number of items to sample from a cluster. If len(cluster) < sample_size, then all items in the cluster will be chosen.
:return: the same cluster object with the `summary` attribute set.
get_cluster_summary = self._get_cluster_summary or self._default_get_cluster_summary
sample = random.sample(cluster.items, min(len(cluster), sample_size))
strs_to_summarize = [item.get_str_to_summarize() for item in sample]
if len(cluster) == 1:
cluster.summary = cluster.items[0].get_str_to_summarize()
return cluster
cluster.summary = get_cluster_summary(strs_to_summarize)
return cluster

def summarize_clusters(self, clusters: list[Cluster], **kwargs) -> list[Cluster]:
return run_parallel(clusters, partial(self.summarize_cluster, **kwargs),
max_workers=25, desc='summarize source clusters')

def _build_clusters_from_cluster_results(labels, embeddings, items):
clusters = []
noise_items = []
noise_embeddings = []
for label in np.unique(labels):
indices = np.where(labels == label)[0]
curr_embeddings = [embeddings[j] for j in indices]
curr_items = [items[j] for j in indices]
if label == -1:
noise_items += curr_items
noise_embeddings += curr_embeddings
clusters += [Cluster(label, curr_embeddings, curr_items)]
return clusters, noise_items, noise_embeddings

def _guess_optimal_kmeans_clustering(embeddings, verbose=True):
Expand Down Expand Up @@ -79,121 +140,47 @@ def make_kmeans_algo_with_optimal_silhouette_score(embeddings: list[np.ndarray])

class StandardClusterer(AbstractClusterer):

def __init__(self, embedding_fn: Callable[[str], ndarray],
get_cluster_algo: Callable[[list[ndarray]], Union[KMeans, HDBSCAN]],
def __init__(self, embedding_fn: Callable[[str], ndarray],
get_cluster_model: Callable[[list[ndarray]], Union[KMeans, HDBSCAN]],
get_cluster_summary: Callable[[list[str]], str] = None,
verbose: bool = True):
:param embedding_fn:
:param get_cluster_algo:
:param get_cluster_summary: function that takes a list of strings to summarize (sampled from cluster items) and returns a summary of the strings.
:param get_cluster_model:
:param verbose:
self.embedding_fn = embedding_fn
self.get_cluster_algo = get_cluster_algo
self.verbose = verbose
self.get_cluster_summary = get_cluster_summary or self._default_get_cluster_summary

super().__init__(embedding_fn, get_cluster_summary, verbose)
self.get_cluster_model = get_cluster_model

def clone(self, **kwargs) -> 'StandardClusterer':
Return new object with all the same data except modifications specified in kwargs
return self.__class__(**{**self.__dict__, **kwargs})

def _default_get_cluster_summary(strs_to_summarize: list[str]) -> str:
llm = ChatOpenAI("gpt-4o", 0)
system = SystemMessage(content="Given a few ideas (wrapped in <idea> "
"XML tags) output a summary of the"
"ideas. Wrap the output in <summary> tags. Summary"
"should be no more than 10 words.")
human = HumanMessage(content=f"<idea>{'</idea><idea>'.join(strs_to_summarize)}</idea>")
response = llm([system, human])
return get_by_xml_tag(response.content, "summary")

def summarize_cluster(self, cluster: Cluster, sample_size=5) -> Cluster:
:param cluster: Cluster to summarize
:param sample_size: Maximum number of items to sample from a cluster. If len(cluster) < sample_size, then all items in the cluster will be chosen.
:return: the same cluster object with the `summary` attribute set.
sample = random.sample(cluster.items, min(len(cluster), sample_size))
strs_to_summarize = [item.get_str_to_summarize() for item in sample]
if len(cluster) == 1:
cluster.summary = cluster.items[0].get_str_to_summarize()
return cluster
cluster.summary = self.get_cluster_summary(strs_to_summarize)
return cluster

def summarize_clusters(self, clusters: list[Cluster], **kwargs) -> list[Cluster]:
return run_parallel(clusters, partial(self.summarize_cluster, **kwargs),
max_workers=25, desc='summarize source clusters', disable=not self.verbose)

def cluster_items(self, items: list[AbstractClusterItem], cluster_noise: bool = False) -> list[Cluster]:
def cluster_items(self, items: list[AbstractClusterItem]) -> list[Cluster]:
:param items: Generic list of items to cluster
:param cluster_noise:
:return: list of Cluster objects
embeddings = run_parallel([item.get_str_to_embed() for item in items], self.embedding_fn, max_workers=40, desc="embedding items for clustering", disable=not self.verbose)
cluster_results = self.get_cluster_algo(embeddings).fit(embeddings)
clusters, noise_items, noise_embeddings = self._build_clusters_from_cluster_results(cluster_results, embeddings, items)
if cluster_noise and len(noise_items) > 0:
embeddings = self.embed_parallel(items, lambda x: x.get_str_to_embed(), max_workers=40, desc="embedding items for clustering")
cluster_results = self.get_cluster_model(embeddings).fit(embeddings)
clusters, noise_items, noise_embeddings = self._build_clusters_from_cluster_results(cluster_results.labels_, embeddings, items)
if len(noise_items) > 0:
noise_results = make_kmeans_algo_with_optimal_silhouette_score(noise_embeddings).fit(noise_embeddings)
noise_clusters, _, _ = self._build_clusters_from_cluster_results(noise_results, noise_embeddings, noise_items)
if self.verbose:
noise_clusters, _, _ = self._build_clusters_from_cluster_results(noise_results.labels_, noise_embeddings, noise_items)
if self._verbose:
print("LEN NOISE_CLUSTERS", len(noise_clusters))
noise_clusters = []
return clusters + noise_clusters

def cluster_and_summarize(self, items: list[AbstractClusterItem], **kwargs) -> list[Cluster]:
clusters = self.cluster_items(items)
return self.summarize_clusters(clusters, **kwargs)

def _build_clusters_from_cluster_results(self, cluster_results: Union[KMeans, HDBSCAN], embeddings, items):
clusters = []
noise_items = []
noise_embeddings = []
for label in set(cluster_results.labels_):
indices = np.where(cluster_results.labels_ == label)[0]
curr_embeddings = [embeddings[j] for j in indices]
curr_items = [items[j] for j in indices]
if label == -1:
noise_items += curr_items
noise_embeddings += curr_embeddings
if self.verbose:
print('noise cluster', len(curr_items))
clusters += [Cluster(label, curr_embeddings, curr_items)]
return clusters, noise_items, noise_embeddings

class OptimizingClusterer(AbstractClusterer):

class HDBSCANOptimizerClusterer(AbstractClusterer):
"min_samples": [1, 1],
"min_cluster_size": [2, 2],
"cluster_selection_method": ["eom", "leaf"],
"cluster_selection_epsilon": [0.65, 0.5],

def __init__(self, clusterer: StandardClusterer, verbose=True):
# TODO param to avoid clustering noise cluster which may mess up optimizer
self.clusterer = clusterer
self.param_search_len = len(self.HDBSCAN_PARAM_OPTS["min_samples"])
self.verbose = verbose

def __init__(self, embedding_fn: Callable[[str], ndarray], clusterers: list[AbstractClusterer], verbose=True):
super().__init__(embedding_fn, verbose=verbose)
self._clusterers = clusterers

def _embed_cluster_summaries(self, summarized_clusters: list[Cluster]):
return run_parallel(
[c.summary for c in summarized_clusters],
max_workers=40, desc="embedding items for clustering", disable=not self.verbose
return self.embed_parallel(
summarized_clusters, lambda x: x.summary, max_workers=40, desc="embedding cluster summaries to score"

def _calculate_clustering_score(self, summarized_clusters: list[Cluster], verbose=True) -> float:
Expand All @@ -208,31 +195,19 @@ def _calculate_clustering_score(self, summarized_clusters: list[Cluster], verbos
clustering_score = closeness_to_ideal_score + avg_min_distance
return clustering_score

def _get_ith_hdbscan_params(self, i):
return reduce(lambda x, y: {**x, y[0]: y[1][i]}, self.HDBSCAN_PARAM_OPTS.items(), {})

def cluster_items(self, items: list[AbstractClusterItem], cluster_noise: bool = False) -> list[Cluster]:
def cluster_items(self, items: list[AbstractClusterItem]) -> list[Cluster]:
best_clusters = None
highest_clustering_score = 0
for i in range(self.param_search_len):
curr_hdbscan_obj = HDBSCAN(**self._get_ith_hdbscan_params(i))
curr_clusterer = self.clusterer.clone(get_cluster_algo=lambda x: curr_hdbscan_obj, verbose=False)
curr_clusters = curr_clusterer.cluster_items(items, cluster_noise=cluster_noise)
summarized_clusters = curr_clusterer.summarize_clusters(curr_clusters)
for clusterer in self._clusterers:
curr_clusters = clusterer.cluster_items(items)
summarized_clusters = clusterer.summarize_clusters(curr_clusters)
clustering_score = self._calculate_clustering_score(summarized_clusters)
print("CLUSTER SCORE: ", clustering_score)
if clustering_score > highest_clustering_score:
highest_clustering_score = clustering_score
best_clusters = curr_clusters
print("best hdbscan params", self._get_ith_hdbscan_params(i))
return best_clusters

def cluster_and_summarize(self, items: list[AbstractClusterItem]) -> list[Cluster]:
clusters = self.cluster_items(items, cluster_noise=True)
summarized_clusters = self.clusterer.summarize_clusters(clusters)
if self.verbose:
print(f'---SUMMARIES--- ({len(summarized_clusters)})')
for cluster in summarized_clusters:
print('\t-', len(cluster.items), cluster.summary.strip())
return summarized_clusters

clusters = self.cluster_items(items)
return self._clusterers[0].summarize_clusters(clusters)
3 changes: 3 additions & 0 deletions experiments/topic_source_curation/
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def choose_ideal_clusters(clusters: list[Cluster], max_clusters: int) -> list[Cl
# sorted_clusters = _sort_clusters_by_instruction(clusters)
return [c for c in clusters if len(clusters) > 1]

def sort_clusters(clusters: list[Cluster], topic:Topic, max_clusters: int) -> list[Cluster]:
# print(f"Sorting {len(clusters)} clusters by interestingness...")
# sorted_clusters = _sort_clusters_by_instruction(clusters, topic)
Expand Down Expand Up @@ -244,11 +245,13 @@ def _get_highest_avg_pairwise_distance_indices(embeddings: np.ndarray) -> np.nda
sorted_indices = np.argsort(avg_distances)[::-1] # Sort in descending order
return sorted_indices

def _sort_by_highest_avg_pairwise_distance(items: list[T], key: Callable[[T], str], verbose=True) -> list[T]:
embeddings = np.array(run_parallel([key(x) for x in items], embed_text_openai, max_workers=100, desc="Embedding summaries for interestingness sort", disable=not verbose))
sorted_indices = _get_highest_avg_pairwise_distance_indices(embeddings)
return [items[i] for i in sorted_indices]

def get_gpt_compare(system_prompt, human_prompt_generator, llm):
content_to_val = {"1":-1, "2":1, "0":0}
def gpt_compare(a, b) -> int:
Expand Down
30 changes: 24 additions & 6 deletions experiments/topic_source_curation/
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Union
from functools import partial
from functools import partial, reduce
from basic_langchain.chat_models import ChatOpenAI
from basic_langchain.schema import HumanMessage, SystemMessage
import random
Expand All @@ -9,7 +9,7 @@
from basic_langchain.embeddings import VoyageAIEmbeddings, OpenAIEmbeddings
from util.pipeline import Artifact
from util.general import get_by_xml_tag
from util.cluster import Cluster, HDBSCANOptimizerClusterer, StandardClusterer, AbstractClusterItem
from util.cluster import Cluster, OptimizingClusterer, StandardClusterer, AbstractClusterItem
from experiments.topic_source_curation.common import get_topic_str_for_prompts
from experiments.topic_source_curation.summarized_source import SummarizedSource
import numpy as np
Expand Down Expand Up @@ -43,12 +43,15 @@ def get_clustered_sources(sources: list[TopicPromptSource]) -> list[Cluster]:
return Artifact(sources).pipe(_cluster_sources, get_text_from_source).data

def embed_text_openai(text):
return np.array(OpenAIEmbeddings(model="text-embedding-3-large").embed_query(text))

def embed_text_voyageai(text):
return np.array(VoyageAIEmbeddings(model="voyage-large-2-instruct").embed_query(text))

def _get_cluster_summary_based_on_topic(topic_desc, strs_to_summarize):
llm = ChatOpenAI("gpt-4o", 0)
system = SystemMessage(content="You are a Jewish scholar familiar with Torah. Given a few ideas (wrapped in <idea> "
Expand All @@ -62,10 +65,25 @@ def _get_cluster_summary_based_on_topic(topic_desc, strs_to_summarize):
summary = response.content.replace('<summary>', '').replace('</summary>', '')
return summary or 'N/A'

"min_samples": [1, 1],
"min_cluster_size": [2, 2],
"cluster_selection_method": ["eom", "leaf"],
"cluster_selection_epsilon": [0.65, 0.5],

def _get_ith_hdbscan_params(i):
return reduce(lambda x, y: {**x, y[0]: y[1][i]}, HDBSCAN_PARAM_OPTS.items(), {})

def _cluster_sources(sources: list[SummarizedSource], topic) -> list[Cluster]:
topic_desc = get_topic_str_for_prompts(topic, verbose=False)
# get_cluster_algo will be optimized by HDBSCANOptimizerClusterer
clusterer = StandardClusterer(embed_text_openai, lambda x: HDBSCAN(),
partial(_get_cluster_summary_based_on_topic, topic_desc))
clusterer_optimizer = HDBSCANOptimizerClusterer(clusterer)
clusterers = []
for i in range(len(HDBSCAN_PARAM_OPTS['min_samples'])):
hdbscan_params = _get_ith_hdbscan_params(i)
clusterers.append(StandardClusterer(embed_text_openai, lambda x: HDBSCAN(**hdbscan_params),
partial(_get_cluster_summary_based_on_topic, topic_desc)))
clusterer_optimizer = OptimizingClusterer(embed_text_openai, clusterers)
return clusterer_optimizer.cluster_and_summarize(sources)
10 changes: 5 additions & 5 deletions experiments/topic_source_curation/
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def get_topics_to_curate():

def save_curation(data, topic: Topic) -> list[SummarizedSource]:
sources, clusters = data
contexts = run_parallel(sources, partial(get_context_for_source, topic=topic, clusters=clusters), max_workers=40, desc="Get source context")
contexts = run_parallel(sources, partial(get_context_for_source, topic=topic, clusters=clusters), max_workers=20, desc="Get source context")
out = [{
"ref": source.source.ref,
"context": contexts[isource]
Expand All @@ -59,16 +59,16 @@ def curate_topic(topic: Topic) -> list[TopicPromptSource]:
# .pipe(save_sources, topic)
.pipe(get_clustered_sources_based_on_summaries, topic)
# .pipe(save_clusters, topic)
.pipe(save_clusters, topic)
# .pipe(load_clusters)
# .pipe(choose, topic)
# .pipe(save_curation, topic).data
.pipe(choose, topic)
.pipe(save_curation, topic).data

if __name__ == '__main__':
# topics = random.sample(get_topics_to_curate(), 50)
topics = [make_llm_topic(SefariaTopic.init(slug)) for slug in ['hagar']]
topics = [make_llm_topic(SefariaTopic.init(slug)) for slug in ['poverty']]
for t in topics:
print("CURATING", t.slug)
curated_sources = curate_topic(t)
Expand Down

0 comments on commit 3957aeb

Please sign in to comment.