Skip to content

Commit

Permalink
Merge pull request #3294 from lbryio/versioned-es-search-index
Browse files Browse the repository at this point in the history
add versioning to ES search index and automate resync on version bumps
  • Loading branch information
jackrobison authored May 7, 2021
2 parents 79ced9d + 81ebde8 commit 48c9e9f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
31 changes: 30 additions & 1 deletion lbry/wallet/server/db/elasticsearch/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@ def lookup_error(cls, url):
return LookupError(f'Could not find claim at "{url}".')


class IndexVersionMismatch(Exception):
def __init__(self, got_version, expected_version):
self.got_version = got_version
self.expected_version = expected_version


class SearchIndex:
VERSION = 1

def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200):
self.search_timeout = search_timeout
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
Expand All @@ -48,6 +56,18 @@ def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhos
self._elastic_host = elastic_host
self._elastic_port = elastic_port

async def get_index_version(self) -> int:
try:
template = await self.sync_client.indices.get_template(self.index)
return template[self.index]['version']
except NotFoundError:
return 0

async def set_index_version(self, version):
await self.sync_client.indices.put_template(
self.index, body={'version': version, 'index_patterns': ['ignored']}, ignore=400
)

async def start(self):
if self.sync_client:
return
Expand All @@ -61,8 +81,17 @@ async def start(self):
except ConnectionError:
self.logger.warning("Failed to connect to Elasticsearch. Waiting for it!")
await asyncio.sleep(1)

res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400)
return res.get('acknowledged', False)
acked = res.get('acknowledged', False)
if acked:
await self.set_index_version(self.VERSION)
return acked
index_version = await self.get_index_version()
if index_version != self.VERSION:
self.logger.error("es search index has an incompatible version: %s vs %s", index_version, self.VERSION)
raise IndexVersionMismatch(f"{index_version} vs {self.VERSION}")
return acked

def stop(self):
clients = [self.sync_client, self.search_client]
Expand Down
9 changes: 8 additions & 1 deletion lbry/wallet/server/db/elasticsearch/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from elasticsearch.helpers import async_bulk
from lbry.wallet.server.env import Env
from lbry.wallet.server.coin import LBC
from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex
from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex, IndexVersionMismatch

INDEX = 'claims'

Expand Down Expand Up @@ -63,8 +63,15 @@ async def consume(producer):
async def make_es_index():
env = Env(LBC)
index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port)

try:
return await index.start()
except IndexVersionMismatch as err:
logging.info(
"dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version
)
await index.delete_index()
return await index.start()
finally:
index.stop()

Expand Down

0 comments on commit 48c9e9f

Please sign in to comment.