From 16a58bb60db08ac0aee2b184bb519aec81ef2d2b Mon Sep 17 00:00:00 2001 From: Juho Inkinen Date: Thu, 2 Mar 2023 17:07:21 +0200 Subject: [PATCH 1/8] Support _suggest_batch operation in NN ensemble backend Make the prediction on the batch in one call of the NN model --- annif/backend/nn_ensemble.py | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/annif/backend/nn_ensemble.py b/annif/backend/nn_ensemble.py index c3caf8fcf..541e9ee5c 100644 --- a/annif/backend/nn_ensemble.py +++ b/annif/backend/nn_ensemble.py @@ -130,18 +130,29 @@ def initialize(self, parallel=False): model_filename, custom_objects={"MeanLayer": MeanLayer} ) - def _merge_hits_from_sources(self, hits_from_sources, params): - score_vector = np.array( + def _merge_hits_from_sources(self, hit_sets_from_sources, params): + score_vectors = np.array( [ - np.sqrt(hits.as_vector(len(subjects))) * weight * len(hits_from_sources) - for hits, weight, subjects in hits_from_sources + [ + np.sqrt(hits.as_vector(len(subjects))) + * weight + * len(hit_sets_from_sources) + for hits, weight, subjects in hits_from_sources + ] + for hits_from_sources in hit_sets_from_sources ], dtype=np.float32, - ) - results = self._model.predict( - np.expand_dims(score_vector.transpose(), 0), verbose=0 - ) - return VectorSuggestionResult(results[0]) + ).transpose(0, 2, 1) + results = self._model.predict(score_vectors, verbose=0) + return [VectorSuggestionResult(res) for res in results] + + def _suggest_batch(self, texts, params): + sources = annif.util.parse_sources(params["sources"]) + hit_sets_from_sources = [ + self._suggest_with_sources(text, sources) for text in texts + ] + merged_hits = self._merge_hits_from_sources(hit_sets_from_sources, params) + return merged_hits def _create_model(self, sources): self.info("creating NN ensemble model") From ba733c2ef34f2d203e4b611179ca6cdf37f60ac9 Mon Sep 17 00:00:00 2001 From: Juho Inkinen Date: Thu, 2 Mar 2023 17:38:22 +0200 Subject: [PATCH 2/8] Use model's __call__ method for predictions --- annif/backend/nn_ensemble.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/annif/backend/nn_ensemble.py b/annif/backend/nn_ensemble.py index 541e9ee5c..a3f6cc133 100644 --- a/annif/backend/nn_ensemble.py +++ b/annif/backend/nn_ensemble.py @@ -143,7 +143,7 @@ def _merge_hits_from_sources(self, hit_sets_from_sources, params): ], dtype=np.float32, ).transpose(0, 2, 1) - results = self._model.predict(score_vectors, verbose=0) + results = self._model(score_vectors).numpy() return [VectorSuggestionResult(res) for res in results] def _suggest_batch(self, texts, params): From f46c2add47dc6ab7d2addbd30849d94a5d66d23d Mon Sep 17 00:00:00 2001 From: Juho Inkinen Date: Thu, 2 Mar 2023 18:42:46 +0200 Subject: [PATCH 3/8] Use batch suggest calls to base projects --- annif/backend/nn_ensemble.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/annif/backend/nn_ensemble.py b/annif/backend/nn_ensemble.py index a3f6cc133..a18fafef9 100644 --- a/annif/backend/nn_ensemble.py +++ b/annif/backend/nn_ensemble.py @@ -130,6 +130,24 @@ def initialize(self, parallel=False): model_filename, custom_objects={"MeanLayer": MeanLayer} ) + def _suggest_batch_with_sources(self, texts, sources): + hit_sets_from_sources = [] + for project_id, weight in sources: + source_project = self.project.registry.get_project(project_id) + hit_sets = source_project.suggest(texts) + norm_hit_sets = [ + self._normalize_hits(hits, source_project) for hits in hit_sets + ] + hit_sets_from_sources.append( + [ + annif.suggestion.WeightedSuggestion( + hits=norm_hits, weight=weight, subjects=source_project.subjects + ) + for norm_hits in norm_hit_sets + ] + ) + return hit_sets_from_sources + def _merge_hits_from_sources(self, hit_sets_from_sources, params): score_vectors = np.array( [ @@ -142,15 +160,13 @@ def _merge_hits_from_sources(self, hit_sets_from_sources, params): for hits_from_sources in hit_sets_from_sources ], dtype=np.float32, - ).transpose(0, 2, 1) + ).transpose(1, 2, 0) results = self._model(score_vectors).numpy() return [VectorSuggestionResult(res) for res in results] def _suggest_batch(self, texts, params): sources = annif.util.parse_sources(params["sources"]) - hit_sets_from_sources = [ - self._suggest_with_sources(text, sources) for text in texts - ] + hit_sets_from_sources = self._suggest_batch_with_sources(texts, sources) merged_hits = self._merge_hits_from_sources(hit_sets_from_sources, params) return merged_hits From 1bfc738e0fcbcaad5a93a04843e1579cbefeee1c Mon Sep 17 00:00:00 2001 From: Juho Inkinen Date: Fri, 3 Mar 2023 13:39:34 +0200 Subject: [PATCH 4/8] Implement _suggest_batch in ensemble backend --- annif/backend/ensemble.py | 43 ++++++++++++++++++------------------ annif/backend/nn_ensemble.py | 24 -------------------- 2 files changed, 22 insertions(+), 45 deletions(-) diff --git a/annif/backend/ensemble.py b/annif/backend/ensemble.py index f51382641..e7d3ace54 100644 --- a/annif/backend/ensemble.py +++ b/annif/backend/ensemble.py @@ -33,35 +33,36 @@ def _normalize_hits(self, hits, source_project): by subclasses.""" return hits - def _suggest_with_sources(self, text, sources): - hits_from_sources = [] + def _suggest_with_sources(self, texts, sources): + hit_sets_from_sources = [] for project_id, weight in sources: source_project = self.project.registry.get_project(project_id) - hits = source_project.suggest([text])[0] - self.debug( - "Got {} hits from project {}, weight {}".format( - len(hits), source_project.project_id, weight - ) - ) - norm_hits = self._normalize_hits(hits, source_project) - hits_from_sources.append( - annif.suggestion.WeightedSuggestion( - hits=norm_hits, weight=weight, subjects=source_project.subjects - ) + hit_sets = source_project.suggest(texts) + norm_hit_sets = [ + self._normalize_hits(hits, source_project) for hits in hit_sets + ] + hit_sets_from_sources.append( + [ + annif.suggestion.WeightedSuggestion( + hits=norm_hits, weight=weight, subjects=source_project.subjects + ) + for norm_hits in norm_hit_sets + ] ) - return hits_from_sources + return hit_sets_from_sources - def _merge_hits_from_sources(self, hits_from_sources, params): + def _merge_hit_sets_from_sources(self, hit_sets_from_sources, params): """Hook for merging hits from sources. Can be overridden by subclasses.""" - return annif.util.merge_hits(hits_from_sources, len(self.project.subjects)) + return [ + annif.util.merge_hits(hits, len(self.project.subjects)) + for hits in hit_sets_from_sources + ] - def _suggest(self, text, params): + def _suggest_batch(self, texts, params): sources = annif.util.parse_sources(params["sources"]) - hits_from_sources = self._suggest_with_sources(text, sources) - merged_hits = self._merge_hits_from_sources(hits_from_sources, params) - self.debug("{} hits after merging".format(len(merged_hits))) - return merged_hits + hit_sets_from_sources = self._suggest_with_sources(texts, sources) + return self._merge_hit_sets_from_sources(hit_sets_from_sources, params) class EnsembleOptimizer(hyperopt.HyperparameterOptimizer): diff --git a/annif/backend/nn_ensemble.py b/annif/backend/nn_ensemble.py index a18fafef9..eca0c5263 100644 --- a/annif/backend/nn_ensemble.py +++ b/annif/backend/nn_ensemble.py @@ -130,24 +130,6 @@ def initialize(self, parallel=False): model_filename, custom_objects={"MeanLayer": MeanLayer} ) - def _suggest_batch_with_sources(self, texts, sources): - hit_sets_from_sources = [] - for project_id, weight in sources: - source_project = self.project.registry.get_project(project_id) - hit_sets = source_project.suggest(texts) - norm_hit_sets = [ - self._normalize_hits(hits, source_project) for hits in hit_sets - ] - hit_sets_from_sources.append( - [ - annif.suggestion.WeightedSuggestion( - hits=norm_hits, weight=weight, subjects=source_project.subjects - ) - for norm_hits in norm_hit_sets - ] - ) - return hit_sets_from_sources - def _merge_hits_from_sources(self, hit_sets_from_sources, params): score_vectors = np.array( [ @@ -164,12 +146,6 @@ def _merge_hits_from_sources(self, hit_sets_from_sources, params): results = self._model(score_vectors).numpy() return [VectorSuggestionResult(res) for res in results] - def _suggest_batch(self, texts, params): - sources = annif.util.parse_sources(params["sources"]) - hit_sets_from_sources = self._suggest_batch_with_sources(texts, sources) - merged_hits = self._merge_hits_from_sources(hit_sets_from_sources, params) - return merged_hits - def _create_model(self, sources): self.info("creating NN ensemble model") From 5e392564edd179355d61039cf7e86cecfbe4f7b2 Mon Sep 17 00:00:00 2001 From: Juho Inkinen Date: Fri, 3 Mar 2023 16:20:48 +0200 Subject: [PATCH 5/8] Use batched suggest calls in EnsembleOptimizer --- annif/backend/ensemble.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/annif/backend/ensemble.py b/annif/backend/ensemble.py index e7d3ace54..7fde38adf 100644 --- a/annif/backend/ensemble.py +++ b/annif/backend/ensemble.py @@ -96,11 +96,15 @@ def _prepare(self, n_jobs=1): jobs, pool_class = annif.parallel.get_pool(n_jobs) with pool_class(jobs) as pool: - for hits, subject_set in pool.imap_unordered( - psmap.suggest, self._corpus.documents + for hit_sets, subject_sets in pool.imap_unordered( + psmap.suggest_batch, self._corpus.doc_batches ): - self._gold_subjects.append(subject_set) - self._source_hits.append(hits) + self._gold_subjects.extend(subject_sets) + self._source_hits.extend(self._hit_sets_to_list(hit_sets)) + + def _hit_sets_to_list(self, hit_sets): + """Convert a dict of lists of hits to a list of dicts of hits""" + return [dict(zip(hit_sets.keys(), hit)) for hit in zip(*hit_sets.values())] def _normalize(self, hps): total = sum(hps.values()) From 5bdfb71480ba698539560ccde6d3b38f64949d63 Mon Sep 17 00:00:00 2001 From: Juho Inkinen Date: Fri, 3 Mar 2023 16:53:14 +0200 Subject: [PATCH 6/8] Fix for forgotten function rename --- annif/backend/nn_ensemble.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/annif/backend/nn_ensemble.py b/annif/backend/nn_ensemble.py index eca0c5263..7e37728e1 100644 --- a/annif/backend/nn_ensemble.py +++ b/annif/backend/nn_ensemble.py @@ -130,7 +130,7 @@ def initialize(self, parallel=False): model_filename, custom_objects={"MeanLayer": MeanLayer} ) - def _merge_hits_from_sources(self, hit_sets_from_sources, params): + def _merge_hit_sets_from_sources(self, hit_sets_from_sources, params): score_vectors = np.array( [ [ From 42e1afca1e05e0d9f11b8472668aacd7b2609396 Mon Sep 17 00:00:00 2001 From: Juho Inkinen Date: Mon, 6 Mar 2023 12:02:54 +0200 Subject: [PATCH 7/8] Make docstring & loop var more informative --- annif/backend/ensemble.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/annif/backend/ensemble.py b/annif/backend/ensemble.py index 7fde38adf..0122f15f4 100644 --- a/annif/backend/ensemble.py +++ b/annif/backend/ensemble.py @@ -103,8 +103,16 @@ def _prepare(self, n_jobs=1): self._source_hits.extend(self._hit_sets_to_list(hit_sets)) def _hit_sets_to_list(self, hit_sets): - """Convert a dict of lists of hits to a list of dicts of hits""" - return [dict(zip(hit_sets.keys(), hit)) for hit in zip(*hit_sets.values())] + """Convert a dict of lists of hits to a list of dicts of hits, e.g. + {"proj-1": [p-1-doc-1-hits, p-1-doc-2-hits] + "proj-2": [p-2-doc-1-hits, p-2-doc-2-hits]} + to + [{"proj-1": p-1-doc-1-hits, "proj-2": p-2-doc-1-hits}, + {"proj-1": p-1-doc-2-hits, "proj-2": p-2-doc-2-hits}] + """ + return [ + dict(zip(hit_sets.keys(), doc_hits)) for doc_hits in zip(*hit_sets.values()) + ] def _normalize(self, hps): total = sum(hps.values()) From f2803423a4dfd7ffef15f76a77a06ea1023940be Mon Sep 17 00:00:00 2001 From: Juho Inkinen Date: Mon, 6 Mar 2023 18:11:42 +0200 Subject: [PATCH 8/8] Turn WeightedSuggestion to WeightedSuggestionsBatch --- annif/backend/ensemble.py | 24 ++++++++++-------------- annif/backend/nn_ensemble.py | 4 ++-- annif/suggestion.py | 4 ++-- annif/util.py | 23 ++++++++++++++--------- 4 files changed, 28 insertions(+), 27 deletions(-) diff --git a/annif/backend/ensemble.py b/annif/backend/ensemble.py index 0122f15f4..c57159112 100644 --- a/annif/backend/ensemble.py +++ b/annif/backend/ensemble.py @@ -42,22 +42,18 @@ def _suggest_with_sources(self, texts, sources): self._normalize_hits(hits, source_project) for hits in hit_sets ] hit_sets_from_sources.append( - [ - annif.suggestion.WeightedSuggestion( - hits=norm_hits, weight=weight, subjects=source_project.subjects - ) - for norm_hits in norm_hit_sets - ] + annif.suggestion.WeightedSuggestionsBatch( + hit_sets=norm_hit_sets, + weight=weight, + subjects=source_project.subjects, + ) ) return hit_sets_from_sources def _merge_hit_sets_from_sources(self, hit_sets_from_sources, params): - """Hook for merging hits from sources. Can be overridden by + """Hook for merging hit sets from sources. Can be overridden by subclasses.""" - return [ - annif.util.merge_hits(hits, len(self.project.subjects)) - for hits in hit_sets_from_sources - ] + return annif.util.merge_hits(hit_sets_from_sources, len(self.project.subjects)) def _suggest_batch(self, texts, params): sources = annif.util.parse_sources(params["sources"]) @@ -133,8 +129,8 @@ def _objective(self, trial): weighted_hits = [] for project_id, hits in srchits.items(): weighted_hits.append( - annif.suggestion.WeightedSuggestion( - hits=hits, + annif.suggestion.WeightedSuggestionsBatch( + hit_sets=[hits], weight=weights[project_id], subjects=self._backend.project.subjects, ) @@ -142,7 +138,7 @@ def _objective(self, trial): batch.evaluate( annif.util.merge_hits( weighted_hits, len(self._backend.project.subjects) - ), + )[0], goldsubj, ) results = batch.results(metrics=[self._metric]) diff --git a/annif/backend/nn_ensemble.py b/annif/backend/nn_ensemble.py index 7e37728e1..8951094ff 100644 --- a/annif/backend/nn_ensemble.py +++ b/annif/backend/nn_ensemble.py @@ -137,9 +137,9 @@ def _merge_hit_sets_from_sources(self, hit_sets_from_sources, params): np.sqrt(hits.as_vector(len(subjects))) * weight * len(hit_sets_from_sources) - for hits, weight, subjects in hits_from_sources + for hits in proj_hit_set ] - for hits_from_sources in hit_sets_from_sources + for proj_hit_set, weight, subjects in hit_sets_from_sources ], dtype=np.float32, ).transpose(1, 2, 0) diff --git a/annif/suggestion.py b/annif/suggestion.py index fdf5aa94d..947e37451 100644 --- a/annif/suggestion.py +++ b/annif/suggestion.py @@ -7,8 +7,8 @@ import numpy as np SubjectSuggestion = collections.namedtuple("SubjectSuggestion", "subject_id score") -WeightedSuggestion = collections.namedtuple( - "WeightedSuggestion", "hits weight subjects" +WeightedSuggestionsBatch = collections.namedtuple( + "WeightedSuggestionsBatch", "hit_sets weight subjects" ) diff --git a/annif/util.py b/annif/util.py index d0bf9842c..9d3cf236c 100644 --- a/annif/util.py +++ b/annif/util.py @@ -54,15 +54,20 @@ def cleanup_uri(uri): return uri -def merge_hits(weighted_hits, size): - """Merge hits from multiple sources. Input is a sequence of WeightedSuggestion - objects. The size parameter determines the length of the subject vector. - Returns an SuggestionResult object.""" - - weights = [whit.weight for whit in weighted_hits] - scores = [whit.hits.as_vector(size) for whit in weighted_hits] - result = np.average(scores, axis=0, weights=weights) - return VectorSuggestionResult(result) +def merge_hits(weighted_hits_batches, size): + """Merge hit sets from multiple sources. Input is a sequence of + WeightedSuggestionsBatch objects. The size parameter determines the length of the + subject vector. Returns a list of SuggestionResult objects.""" + + weights = [batch.weight for batch in weighted_hits_batches] + score_vectors = np.array( + [ + [whits.as_vector(size) for whits in batch.hit_sets] + for batch in weighted_hits_batches + ] + ) + results = np.average(score_vectors, axis=0, weights=weights) + return [VectorSuggestionResult(res) for res in results] def parse_sources(sourcedef):