Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor reindexing #103

Merged
merged 4 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,31 @@ name: "Test"
on: [push, pull_request]

jobs:
nose:
test:
runs-on: ubuntu-latest

strategy:
matrix:
elastic-version: [7.12.0, 7.11.2, 7.10.2]
python-version: ['3.8', '3.10']

services:
elastic:
image: docker.elastic.co/elasticsearch/elasticsearch:7.10.1
ports:
- 9200:9200
env:
discovery.type: single-node

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
- uses: actions/setup-python@v4
with:
python-version: "3.8"

- name: install
run: |
docker pull "docker.elastic.co/elasticsearch/elasticsearch:${{ matrix.elastic-version }}"
docker run -d -p 9200:9200 -e "discovery.type=single-node" --tmpfs "/usr/share/elasticsearch/data" "docker.elastic.co/elasticsearch/elasticsearch:${{ matrix.elastic-version }}"
python-version: ${{ matrix.python-version }}

- run: |
pip install -r requirements.txt
pip install .
pip install -e .
./scripts/wait-for-elastic.sh

- name: test
run: nosetests
- run: pytest
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
version: "2.4"
services:
elastic:
image: docker.elastic.co/elasticsearch/elasticsearch:7.7.0
image: docker.elastic.co/elasticsearch/elasticsearch:7.10.1
ports:
- "9200:9200"
environment:
- discovery.type=single-node
tmpfs:
- /usr/share/elasticsearch/data

171 changes: 140 additions & 31 deletions eve_elastic/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
import pytz # NOQA
import logging
import elasticsearch
import time

from bson import ObjectId
from elasticsearch.helpers import bulk, reindex
from elasticsearch.helpers import bulk, reindex # noqa: F401
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find any usage of reindex in this module. Is it used somewhere else that imports this module?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep it's imported in superdesk-core, will remove it from there but will keep it for now to avoid a breaking change


from uuid import uuid4
from flask import request, abort, json, current_app as app
from eve.utils import config
from eve.io.base import DataLayer
from eve.io.mongo.parser import parse, ParseError
from elasticsearch import Elasticsearch


logging.basicConfig()
Expand Down Expand Up @@ -332,14 +334,14 @@ def set_sort(query, sort):
query["sort"].append(sort_dict)


def get_es(url, **kwargs):
def get_es(url, **kwargs) -> Elasticsearch:
"""Create elasticsearch client instance.

:param url: elasticsearch url
"""
urls = [url] if isinstance(url, str) else url
kwargs.setdefault("serializer", ElasticJSONSerializer())
es = elasticsearch.Elasticsearch(urls, **kwargs)
es = Elasticsearch(urls, **kwargs)
return es


Expand Down Expand Up @@ -989,7 +991,7 @@ def _resource_config(self, resource, key, default=None):
px = self._resource_prefix(resource)
return app.config.get("%s_%s" % (px, key), default)

def elastic(self, resource):
def elastic(self, resource) -> Elasticsearch:
"""Get ElasticSearch instance for given resource."""
px = self._resource_prefix(resource)

Expand Down Expand Up @@ -1041,46 +1043,153 @@ def search(self, query, resources, params=None):
except elasticsearch.exceptions.RequestError:
raise

def reindex(self, resource):
def reindex(self, resource, *, requests_per_second=1000): # noqa: F811
es = self.elastic(resource)
alias = self._resource_index(resource)
settings = self._resource_config(resource, "SETTINGS")
mapping = self._resource_mapping(resource)
new_index = False

old_index = None
try:
old_index = self.get_index(resource)
indexes = es.indices.get_alias(name=alias)
for index, aliases in indexes.items():
old_index = index
specs = aliases["aliases"][alias]
if specs and specs["is_write_index"]:
break
except elasticsearch.exceptions.NotFoundError:
new_index = True
pass

if old_index:
print("OLD INDEX", old_index)

# create new index
index = generate_index_name(alias)
print("create", index)
self._create_index(es, index, settings)
self._put_mapping(es, index, mapping)

if not new_index:
# reindex data
print("reindex", alias, index)
reindex(es, alias, index)

# remove old alias
print("remove alias", alias, old_index)
new_index = generate_index_name(alias)
self._create_index(es, new_index, settings)
self._put_mapping(es, new_index, mapping)

print("NEW INDEX", new_index)

if not old_index:
try:
es.indices.delete_alias(index=old_index, name=alias)
es.indices.get(index=alias)
print("Old index is not using an alias.")
_async_reindex(es, alias, new_index)
es.indices.update_aliases(
body={
"actions": [
{
"add": {
"index": new_index,
"alias": alias,
"is_write_index": True,
},
},
{
"remove_index": {
"index": alias,
},
},
],
}
)
except elasticsearch.exceptions.NotFoundError:
# this was not an alias, but an index. will be removed in next step
pass
print("There is no index to reindex from, done.")
es.indices.put_alias(index=new_index, name=alias)
return

# tmp index will be used for new items arriving during reindex
tmp_index = f"{old_index}-tmp"
self._create_index(es, tmp_index, settings)
self._put_mapping(es, tmp_index, mapping)
print("TMP INDEX", tmp_index)

# add tmp index as writable
es.indices.update_aliases(
body={
"actions": [
{
"add": { # add tmp index as write index
"index": tmp_index,
"alias": alias,
"is_write_index": True,
},
},
{
"add": { # make sure the old index is not write index
"index": old_index,
"alias": alias,
"is_write_index": False,
},
},
],
}
)

_async_reindex(
es, old_index, new_index, requests_per_second=requests_per_second
)

# add new index is writable, tmp readonly
es.indices.update_aliases(
body={
"actions": [
{
"add": { # add new index as write index
"index": new_index,
"alias": alias,
"is_write_index": True,
},
},
{
"add": { # make tmp index readonly
"index": tmp_index,
"alias": alias,
"is_write_index": False,
},
},
{
"remove_index": {
"index": old_index,
},
},
],
}
)

# remove old index
print("remove index", old_index)
es.indices.delete(old_index)
# do it as fast as possible
_async_reindex(es, tmp_index, new_index)

# create alias for new index
print("put", alias, index)
es.indices.put_alias(index=index, name=alias)
print("REMOVE TMP INDEX", tmp_index)
es.indices.delete(index=tmp_index)

print("refresh", index)
es.indices.refresh(index)

def _async_reindex(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although es.reindex execution here is async, this function is still sync as it waits for the task to complete before returning to the callee. Therefor using async in the function name may be misleading (especially when we venture down the async path).

es: Elasticsearch, old_index: str, new_index: str, *, requests_per_second=None
):
resp = es.reindex(
body={
"source": {"index": old_index},
"dest": {"index": new_index, "version_type": "external"},
},
requests_per_second=requests_per_second,
wait_for_completion=False,
refresh=True,
)
task_id = resp["task"]
print(f"REINDEXING {old_index} to {new_index} (task {task_id})")

while True:
time.sleep(2.0)
task_info = es.tasks.get(task_id=task_id)
if task_info["completed"]:
total = task_info["response"]["total"]
took = int(task_info["response"]["took"] / 1000) # ms to s
print()
print(f"DONE reindexing {total} items, took {took}s")
break
else:
print(".", end="")


def build_elastic_query(doc):
Expand Down
5 changes: 1 addition & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
nose
mock
werkzeug>=1.0,<1.1
pytest
eve>1.1,<1.2
MarkupSafe>2.0,<2.1
2 changes: 1 addition & 1 deletion setup.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"arrow>=0.4.2",
"ciso8601>=1.0.2,<2",
"pytz>=2015.4",
"elasticsearch>=7.0,<7.14",
"elasticsearch>=7.0,<8.0",
],
classifiers=[
"Development Status :: 4 - Beta",
Expand Down
13 changes: 7 additions & 6 deletions test/test_elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

import eve
import time
import pytest
import elasticsearch

from unittest import TestCase, skip
from datetime import datetime
from copy import deepcopy
from flask import json
from eve.utils import config, ParsedRequest, parse_request
from eve_elastic.elastic import parse_date, Elastic, get_es, generate_index_name
from nose.tools import raises

from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -577,12 +578,12 @@ def test_remove_non_existing_item(self):
with self.app.app_context():
self.assertEqual(self.app.data.remove("items", {"_id": "notfound"}), None)

@raises(elasticsearch.exceptions.ConnectionError)
def test_it_can_use_configured_url(self):
with self.app.app_context():
self.app.config["ELASTICSEARCH_URL"] = "http://localhost:9292"
elastic = Elastic(self.app)
elastic.init_index()
with pytest.raises(elasticsearch.exceptions.ConnectionError):
with self.app.app_context():
self.app.config["ELASTICSEARCH_URL"] = "http://localhost:9292"
elastic = Elastic(self.app)
elastic.init_index()

def test_resource_aggregates(self):
with self.app.app_context():
Expand Down
Loading