diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7c9ce76..ff8b4b5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -3,28 +3,31 @@ name: "Test" on: [push, pull_request] jobs: - nose: + test: runs-on: ubuntu-latest strategy: matrix: - elastic-version: [7.12.0, 7.11.2, 7.10.2] + python-version: ['3.8', '3.10'] + + services: + elastic: + image: docker.elastic.co/elasticsearch/elasticsearch:7.10.1 + ports: + - 9200:9200 + env: + discovery.type: single-node steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + - uses: actions/setup-python@v4 with: - python-version: "3.8" - - - name: install - run: | - docker pull "docker.elastic.co/elasticsearch/elasticsearch:${{ matrix.elastic-version }}" - docker run -d -p 9200:9200 -e "discovery.type=single-node" --tmpfs "/usr/share/elasticsearch/data" "docker.elastic.co/elasticsearch/elasticsearch:${{ matrix.elastic-version }}" + python-version: ${{ matrix.python-version }} + + - run: | pip install -r requirements.txt - pip install . + pip install -e . ./scripts/wait-for-elastic.sh - - name: test - run: nosetests \ No newline at end of file + - run: pytest \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 08403bf..2fe4ffe 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,10 +1,11 @@ version: "2.4" services: elastic: - image: docker.elastic.co/elasticsearch/elasticsearch:7.7.0 + image: docker.elastic.co/elasticsearch/elasticsearch:7.10.1 ports: - "9200:9200" environment: - discovery.type=single-node tmpfs: - /usr/share/elasticsearch/data + diff --git a/eve_elastic/elastic.py b/eve_elastic/elastic.py index 6ff298b..20117dc 100644 --- a/eve_elastic/elastic.py +++ b/eve_elastic/elastic.py @@ -5,15 +5,18 @@ import pytz # NOQA import logging import elasticsearch +import time from bson import ObjectId -from elasticsearch.helpers import bulk, reindex +from elasticsearch.helpers import bulk, reindex # noqa: F401 +from click import progressbar from uuid import uuid4 from flask import request, abort, json, current_app as app from eve.utils import config from eve.io.base import DataLayer from eve.io.mongo.parser import parse, ParseError +from elasticsearch import Elasticsearch logging.basicConfig() @@ -332,14 +335,14 @@ def set_sort(query, sort): query["sort"].append(sort_dict) -def get_es(url, **kwargs): +def get_es(url, **kwargs) -> Elasticsearch: """Create elasticsearch client instance. :param url: elasticsearch url """ urls = [url] if isinstance(url, str) else url kwargs.setdefault("serializer", ElasticJSONSerializer()) - es = elasticsearch.Elasticsearch(urls, **kwargs) + es = Elasticsearch(urls, **kwargs) return es @@ -989,7 +992,7 @@ def _resource_config(self, resource, key, default=None): px = self._resource_prefix(resource) return app.config.get("%s_%s" % (px, key), default) - def elastic(self, resource): + def elastic(self, resource) -> Elasticsearch: """Get ElasticSearch instance for given resource.""" px = self._resource_prefix(resource) @@ -1041,46 +1044,185 @@ def search(self, query, resources, params=None): except elasticsearch.exceptions.RequestError: raise - def reindex(self, resource): + def reindex(self, resource, *, requests_per_second=1000): # noqa: F811 es = self.elastic(resource) alias = self._resource_index(resource) settings = self._resource_config(resource, "SETTINGS") mapping = self._resource_mapping(resource) - new_index = False + + old_index = None try: - old_index = self.get_index(resource) + indexes = es.indices.get_alias(name=alias) + for index, aliases in indexes.items(): + old_index = index + specs = aliases["aliases"][alias] + if specs and specs["is_write_index"]: + break except elasticsearch.exceptions.NotFoundError: - new_index = True + pass + + if old_index: + print("OLD INDEX", old_index) # create new index - index = generate_index_name(alias) - print("create", index) - self._create_index(es, index, settings) - self._put_mapping(es, index, mapping) - - if not new_index: - # reindex data - print("reindex", alias, index) - reindex(es, alias, index) - - # remove old alias - print("remove alias", alias, old_index) + new_index = generate_index_name(alias) + self._create_index(es, new_index, settings) + self._put_mapping(es, new_index, mapping) + + print("NEW INDEX", new_index) + + if not old_index: try: - es.indices.delete_alias(index=old_index, name=alias) + es.indices.get(index=alias) + print("Old index is not using an alias.") + _background_reindex( + es, + alias, + new_index, + requests_per_second=requests_per_second, + refresh=True, + ) + es.indices.update_aliases( + body={ + "actions": [ + { + "add": { + "index": new_index, + "alias": alias, + "is_write_index": True, + }, + }, + { + "remove_index": { + "index": alias, + }, + }, + ], + } + ) except elasticsearch.exceptions.NotFoundError: - # this was not an alias, but an index. will be removed in next step - pass + print("There is no index to reindex from, done.") + es.indices.put_alias(index=new_index, name=alias) + return + + # tmp index will be used for new items arriving during reindex + tmp_index = f"{old_index}-tmp" + self._create_index(es, tmp_index, settings) + self._put_mapping(es, tmp_index, mapping) + print("TMP INDEX", tmp_index) + + # add tmp index as writable + es.indices.update_aliases( + body={ + "actions": [ + { + "add": { # add tmp index as write index + "index": tmp_index, + "alias": alias, + "is_write_index": True, + }, + }, + { + "add": { # make sure the old index is not write index + "index": old_index, + "alias": alias, + "is_write_index": False, + }, + }, + ], + } + ) + + _background_reindex( + es, old_index, new_index, requests_per_second=requests_per_second + ) + + # add new index is writable, tmp readonly + es.indices.update_aliases( + body={ + "actions": [ + { + "add": { # add new index as write index + "index": new_index, + "alias": alias, + "is_write_index": True, + }, + }, + { + "add": { # make tmp index readonly + "index": tmp_index, + "alias": alias, + "is_write_index": False, + }, + }, + { + "remove_index": { + "index": old_index, + }, + }, + ], + } + ) + + # do it as fast as possible + _background_reindex(es, tmp_index, new_index, refresh=True) + + print("REMOVE TMP INDEX", tmp_index) + es.indices.delete(index=tmp_index) + + +def _background_reindex( + es: Elasticsearch, + old_index: str, + new_index: str, + *, + requests_per_second=None, + refresh=False, +): + resp = es.reindex( + body={ + "source": {"index": old_index}, + "dest": {"index": new_index, "version_type": "external"}, + }, + requests_per_second=requests_per_second, + wait_for_completion=False, + refresh=refresh, + ) + task_id = resp["task"] + print(f"REINDEXING {old_index} to {new_index} (task {task_id})") + + # first get total number of items + while True: + time.sleep(1.0) + task = es.tasks.get(task_id=task_id) + if task["completed"]: + print_task_done(task) + return + if task["task"]["status"]["total"]: + break + + # now it can render progress + last_created = 0 + with progressbar(length=task["task"]["status"]["total"], label="Reindexing") as bar: + while True: + time.sleep(2.0) + task = es.tasks.get(task_id=task_id) + if ( + task["task"]["status"]["created"] + and task["task"]["status"]["created"] > last_created + ): + bar.update(task["task"]["status"]["created"] - last_created) + last_created = task["task"]["status"]["created"] + if task["completed"]: + bar.finish() + break - # remove old index - print("remove index", old_index) - es.indices.delete(old_index) + print_task_done(task) - # create alias for new index - print("put", alias, index) - es.indices.put_alias(index=index, name=alias) - print("refresh", index) - es.indices.refresh(index) +def print_task_done(task): + took = int(task["response"]["took"] / 1000) + print(f"DONE in {took}s") def build_elastic_query(doc): diff --git a/requirements.txt b/requirements.txt index 385063a..252dd1a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,2 @@ -nose -mock -werkzeug>=1.0,<3.1 +pytest eve>1.1,<2.2 -MarkupSafe>2.0,<2.2 \ No newline at end of file diff --git a/setup.py b/setup.py index 4f74236..2c1e3b8 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ "arrow>=0.4.2", "ciso8601>=1.0.2,<3", "pytz>=2015.4", - "elasticsearch>=7.0,<7.14", + "elasticsearch>=7.0,<8.0", ], classifiers=[ "Development Status :: 4 - Beta", diff --git a/test/test_elastic.py b/test/test_elastic.py index a108a9c..aff307c 100644 --- a/test/test_elastic.py +++ b/test/test_elastic.py @@ -2,14 +2,15 @@ import eve import time +import pytest import elasticsearch + from unittest import TestCase, skip from datetime import datetime from copy import deepcopy from flask import json from eve.utils import config, ParsedRequest, parse_request from eve_elastic.elastic import parse_date, Elastic, get_es, generate_index_name -from nose.tools import raises from unittest.mock import MagicMock, patch @@ -577,12 +578,12 @@ def test_remove_non_existing_item(self): with self.app.app_context(): self.assertEqual(self.app.data.remove("items", {"_id": "notfound"}), None) - @raises(elasticsearch.exceptions.ConnectionError) def test_it_can_use_configured_url(self): - with self.app.app_context(): - self.app.config["ELASTICSEARCH_URL"] = "http://localhost:9292" - elastic = Elastic(self.app) - elastic.init_index() + with pytest.raises(elasticsearch.exceptions.ConnectionError): + with self.app.app_context(): + self.app.config["ELASTICSEARCH_URL"] = "http://localhost:9292" + elastic = Elastic(self.app) + elastic.init_index() def test_resource_aggregates(self): with self.app.app_context():