diff --git a/.travis.yml b/.travis.yml index 795d33c5a..b435a1c98 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,7 +26,7 @@ install: - pip install --upgrade pytest - pipenv install --dev --skip-lock - travis_wait 30 python -m nltk.downloader punkt -# Install the optional neural network dependencies (Keras and TensorFlow) +# Install the optional neural network dependencies (TensorFlow and LMDB) # - except for one Python version (3.8) so that we can test also without them # (tensorflow 2.0.0 isn't available for python 3.8 anyway) - if [[ $TRAVIS_PYTHON_VERSION != '3.8' ]]; then pip install .[nn]; fi diff --git a/annif/backend/nn_ensemble.py b/annif/backend/nn_ensemble.py index e06c2d81d..4ebe2e40c 100644 --- a/annif/backend/nn_ensemble.py +++ b/annif/backend/nn_ensemble.py @@ -2,10 +2,15 @@ projects.""" +from io import BytesIO import os.path import numpy as np +from scipy.sparse import csr_matrix +import joblib +import lmdb from tensorflow.keras.layers import Input, Dense, Add, Flatten, Lambda, Dropout from tensorflow.keras.models import Model, load_model +from tensorflow.keras.utils import Sequence import tensorflow.keras.backend as K import annif.corpus import annif.project @@ -16,6 +21,57 @@ from . import ensemble +class LMDBSequence(Sequence): + """A sequence of samples stored in a LMDB database.""" + def __init__(self, txn, batch_size): + self._txn = txn + cursor = txn.cursor() + if cursor.last(): + self._counter = int(cursor.key()) + else: # empty database + self._counter = 0 + self._batch_size = batch_size + + @staticmethod + def int_to_key(val): + return b'%08d' % val + + @staticmethod + def key_to_int(key): + return int(key) + + def add_sample(self, inputs, targets): + # use zero-padded 8-digit key + key = self.int_to_key(self._counter) + self._counter += 1 + # convert the sample into a sparse matrix and serialize it as bytes + sample = (csr_matrix(inputs), csr_matrix(targets)) + buf = BytesIO() + joblib.dump(sample, buf) + buf.seek(0) + self._txn.put(key, buf.read()) + + def __getitem__(self, idx): + """get a particular batch of samples""" + cursor = self._txn.cursor() + first_key = idx * self._batch_size + last_key = first_key + self._batch_size + cursor.set_key(self.int_to_key(first_key)) + input_arrays = [] + target_arrays = [] + for key, value in cursor.iternext(): + if self.key_to_int(key) >= last_key: + break + input_csr, target_csr = joblib.load(BytesIO(value)) + input_arrays.append(input_csr.toarray()) + target_arrays.append(target_csr.toarray().flatten()) + return np.array(input_arrays), np.array(target_arrays) + + def __len__(self): + """return the number of available batches""" + return int(np.ceil(self._counter / self._batch_size)) + + class NNEnsembleBackend( backend.AnnifLearningBackend, ensemble.EnsembleBackend): @@ -25,6 +81,8 @@ class NNEnsembleBackend( name = "nn_ensemble" MODEL_FILE = "nn-model.h5" + LMDB_FILE = 'nn-train.mdb' + LMDB_MAP_SIZE = 1024 * 1024 * 1024 DEFAULT_PARAMS = { 'nodes': 100, @@ -99,35 +157,32 @@ def _train(self, corpus, params): self._create_model(sources) self._fit_model(corpus, epochs=int(params['epochs'])) - def _corpus_to_vectors(self, corpus): + def _corpus_to_vectors(self, corpus, seq): # pass corpus through all source projects sources = [(annif.project.get_project(project_id), weight) for project_id, weight in annif.util.parse_sources(self.params['sources'])] - score_vectors = [] - true_vectors = [] for doc in corpus.documents: doc_scores = [] for source_project, weight in sources: hits = source_project.suggest(doc.text) doc_scores.append(hits.vector * weight) - score_vectors.append(np.array(doc_scores, - dtype=np.float32).transpose()) + score_vector = np.array(doc_scores, + dtype=np.float32).transpose() subjects = annif.corpus.SubjectSet((doc.uris, doc.labels)) - true_vectors.append(subjects.as_vector(self.project.subjects)) - # collect the results into a single vector, considering weights - scores = np.array(score_vectors, dtype=np.float32) - # collect the gold standard values into another vector - true = np.array(true_vectors, dtype=np.float32) - return (scores, true) + true_vector = subjects.as_vector(self.project.subjects) + seq.add_sample(score_vector, true_vector) def _fit_model(self, corpus, epochs): - scores, true = self._corpus_to_vectors(corpus) - - # fit the model - self._model.fit(scores, true, batch_size=32, verbose=True, - epochs=epochs) + lmdb_path = os.path.join(self.datadir, self.LMDB_FILE) + env = lmdb.open(lmdb_path, map_size=self.LMDB_MAP_SIZE, writemap=True) + with env.begin(write=True, buffers=True) as txn: + seq = LMDBSequence(txn, batch_size=32) + self._corpus_to_vectors(corpus, seq) + + # fit the model + self._model.fit(seq, verbose=True, epochs=epochs) annif.util.atomic_save( self._model, diff --git a/setup.py b/setup.py index 71ddeef55..3957383a2 100644 --- a/setup.py +++ b/setup.py @@ -37,7 +37,7 @@ def read(fname): 'fasttext': ['fasttext', 'fasttextmirror==0.8.22'], 'voikko': ['voikko'], 'vw': ['vowpalwabbit==8.7.*'], - 'nn': ['tensorflow==2.0.*'], + 'nn': ['tensorflow==2.0.*', 'lmdb==0.98'], 'omikuji': ['omikuji==0.2.*'], }, entry_points={