Skip to content

Commit

Permalink
Use LMDB and sparse vectors in nn_ensemble backend, instead of aggreg…
Browse files Browse the repository at this point in the history
…ating large vectors in RAM (#363)
  • Loading branch information
osma committed Jan 28, 2020
1 parent 21d92bf commit a845cbf
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ install:
- pip install --upgrade pytest
- pipenv install --dev --skip-lock
- travis_wait 30 python -m nltk.downloader punkt
# Install the optional neural network dependencies (Keras and TensorFlow)
# Install the optional neural network dependencies (TensorFlow and LMDB)
# - except for one Python version (3.8) so that we can test also without them
# (tensorflow 2.0.0 isn't available for python 3.8 anyway)
- if [[ $TRAVIS_PYTHON_VERSION != '3.8' ]]; then pip install .[nn]; fi
Expand Down
87 changes: 71 additions & 16 deletions annif/backend/nn_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
projects."""


from io import BytesIO
import os.path
import numpy as np
from scipy.sparse import csr_matrix
import joblib
import lmdb
from tensorflow.keras.layers import Input, Dense, Add, Flatten, Lambda, Dropout
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.utils import Sequence
import tensorflow.keras.backend as K
import annif.corpus
import annif.project
Expand All @@ -16,6 +21,57 @@
from . import ensemble


class LMDBSequence(Sequence):
"""A sequence of samples stored in a LMDB database."""
def __init__(self, txn, batch_size):
self._txn = txn
cursor = txn.cursor()
if cursor.last():
self._counter = int(cursor.key())
else: # empty database
self._counter = 0
self._batch_size = batch_size

@staticmethod
def int_to_key(val):
return b'%08d' % val

@staticmethod
def key_to_int(key):
return int(key)

def add_sample(self, inputs, targets):
# use zero-padded 8-digit key
key = self.int_to_key(self._counter)
self._counter += 1
# convert the sample into a sparse matrix and serialize it as bytes
sample = (csr_matrix(inputs), csr_matrix(targets))
buf = BytesIO()
joblib.dump(sample, buf)
buf.seek(0)
self._txn.put(key, buf.read())

def __getitem__(self, idx):
"""get a particular batch of samples"""
cursor = self._txn.cursor()
first_key = idx * self._batch_size
last_key = first_key + self._batch_size
cursor.set_key(self.int_to_key(first_key))
input_arrays = []
target_arrays = []
for key, value in cursor.iternext():
if self.key_to_int(key) >= last_key:
break
input_csr, target_csr = joblib.load(BytesIO(value))
input_arrays.append(input_csr.toarray())
target_arrays.append(target_csr.toarray().flatten())
return np.array(input_arrays), np.array(target_arrays)

def __len__(self):
"""return the number of available batches"""
return int(np.ceil(self._counter / self._batch_size))


class NNEnsembleBackend(
backend.AnnifLearningBackend,
ensemble.EnsembleBackend):
Expand All @@ -25,6 +81,8 @@ class NNEnsembleBackend(
name = "nn_ensemble"

MODEL_FILE = "nn-model.h5"
LMDB_FILE = 'nn-train.mdb'
LMDB_MAP_SIZE = 1024 * 1024 * 1024

DEFAULT_PARAMS = {
'nodes': 100,
Expand Down Expand Up @@ -99,35 +157,32 @@ def _train(self, corpus, params):
self._create_model(sources)
self._fit_model(corpus, epochs=int(params['epochs']))

def _corpus_to_vectors(self, corpus):
def _corpus_to_vectors(self, corpus, seq):
# pass corpus through all source projects
sources = [(annif.project.get_project(project_id), weight)
for project_id, weight
in annif.util.parse_sources(self.params['sources'])]

score_vectors = []
true_vectors = []
for doc in corpus.documents:
doc_scores = []
for source_project, weight in sources:
hits = source_project.suggest(doc.text)
doc_scores.append(hits.vector * weight)
score_vectors.append(np.array(doc_scores,
dtype=np.float32).transpose())
score_vector = np.array(doc_scores,
dtype=np.float32).transpose()
subjects = annif.corpus.SubjectSet((doc.uris, doc.labels))
true_vectors.append(subjects.as_vector(self.project.subjects))
# collect the results into a single vector, considering weights
scores = np.array(score_vectors, dtype=np.float32)
# collect the gold standard values into another vector
true = np.array(true_vectors, dtype=np.float32)
return (scores, true)
true_vector = subjects.as_vector(self.project.subjects)
seq.add_sample(score_vector, true_vector)

def _fit_model(self, corpus, epochs):
scores, true = self._corpus_to_vectors(corpus)

# fit the model
self._model.fit(scores, true, batch_size=32, verbose=True,
epochs=epochs)
lmdb_path = os.path.join(self.datadir, self.LMDB_FILE)
env = lmdb.open(lmdb_path, map_size=self.LMDB_MAP_SIZE, writemap=True)
with env.begin(write=True, buffers=True) as txn:
seq = LMDBSequence(txn, batch_size=32)
self._corpus_to_vectors(corpus, seq)

# fit the model
self._model.fit(seq, verbose=True, epochs=epochs)

annif.util.atomic_save(
self._model,
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def read(fname):
'fasttext': ['fasttext', 'fasttextmirror==0.8.22'],
'voikko': ['voikko'],
'vw': ['vowpalwabbit==8.7.*'],
'nn': ['tensorflow==2.0.*'],
'nn': ['tensorflow==2.0.*', 'lmdb==0.98'],
'omikuji': ['omikuji==0.2.*'],
},
entry_points={
Expand Down

0 comments on commit a845cbf

Please sign in to comment.