diff --git a/kolibri/core/auth/management/commands/deletefacility.py b/kolibri/core/auth/management/commands/deletefacility.py index 4b53da5a169..eac885a1fa6 100644 --- a/kolibri/core/auth/management/commands/deletefacility.py +++ b/kolibri/core/auth/management/commands/deletefacility.py @@ -18,9 +18,9 @@ from kolibri.core.auth.management.utils import DisablePostDeleteSignal from kolibri.core.auth.management.utils import get_facility from kolibri.core.auth.models import AdHocGroup -from kolibri.core.auth.models import cache from kolibri.core.auth.models import Classroom from kolibri.core.auth.models import Collection +from kolibri.core.auth.models import dataset_cache from kolibri.core.auth.models import FacilityDataset from kolibri.core.auth.models import FacilityUser from kolibri.core.auth.models import LearnerGroup @@ -205,7 +205,7 @@ def handle_async(self, *args, **options): count, stats = delete_group.delete(update_progress) total_deleted += count # clear related cache - cache.clear() + dataset_cache.clear() # if count doesn't match, something doesn't seem right if total_count != total_deleted: diff --git a/kolibri/core/auth/management/commands/sync.py b/kolibri/core/auth/management/commands/sync.py index d53430ad82c..3423e736fd2 100644 --- a/kolibri/core/auth/management/commands/sync.py +++ b/kolibri/core/auth/management/commands/sync.py @@ -19,6 +19,7 @@ from kolibri.core.auth.constants.morango_sync import State from kolibri.core.auth.management.utils import get_facility from kolibri.core.auth.management.utils import run_once +from kolibri.core.auth.models import dataset_cache from kolibri.core.tasks.management.commands.base import AsyncCommand from kolibri.core.tasks.utils import db_task_write_lock from kolibri.utils import conf @@ -103,6 +104,8 @@ def handle_async(self, *args, **options): if not ScopeDefinition.objects.filter(): call_command("loaddata", "scopedefinitions") + dataset_cache.clear() + # try to connect to server controller = MorangoProfileController(PROFILE_FACILITY_DATA) network_connection = controller.create_network_connection(baseurl) @@ -173,13 +176,9 @@ def handle_async(self, *args, **options): client_cert, server_cert, chunk_size=chunk_size ) - # setup all progress trackers before starting so CLI progress is accurate - self._setup_progress_tracking( - sync_client, not no_pull, not no_push, noninteractive - ) - # pull from server and push our own data to server if not no_pull: + self._setup_pull_progress_tracking(sync_client, noninteractive) self._session_tracker_adapter( sync_client.signals.session, "Creating pull transfer session", @@ -188,6 +187,7 @@ def handle_async(self, *args, **options): with db_task_write_lock: sync_client.initiate_pull(Filter(dataset_id)) if not no_push: + self._setup_push_progress_tracking(sync_client, noninteractive) self._session_tracker_adapter( sync_client.signals.session, "Creating push transfer session", @@ -209,69 +209,70 @@ def handle_async(self, *args, **options): logger.info("Syncing has been completed.") - def _setup_progress_tracking(self, sync_client, pulling, pushing, noninteractive): + def _setup_pull_progress_tracking(self, sync_client, noninteractive): """ - Sets up progress trackers for the various sync stages + Sets up progress trackers for pull stages :type sync_client: morango.sync.syncsession.SyncClient - :type pulling: bool - :type pushing: bool :type noninteractive: bool """ transfer_message = "{records_transferred}/{records_total}, {transfer_total}" - if pulling: - self._queueing_tracker_adapter( - sync_client.signals.queuing, - "Remotely preparing data", - State.REMOTE_QUEUING, - False, - noninteractive, - ) - self._transfer_tracker_adapter( - sync_client.signals.pulling, - "Receiving data ({})".format(transfer_message), - State.PULLING, - noninteractive, - ) - self._queueing_tracker_adapter( - sync_client.signals.dequeuing, - "Locally integrating received data", - State.LOCAL_DEQUEUING, - False, - noninteractive, - ) + self._queueing_tracker_adapter( + sync_client.signals.queuing, + "Remotely preparing data", + State.REMOTE_QUEUING, + False, + noninteractive, + ) + self._transfer_tracker_adapter( + sync_client.signals.pulling, + "Receiving data ({})".format(transfer_message), + State.PULLING, + noninteractive, + ) + self._queueing_tracker_adapter( + sync_client.signals.dequeuing, + "Locally integrating received data", + State.LOCAL_DEQUEUING, + False, + noninteractive, + ) - if pushing: - self._queueing_tracker_adapter( - sync_client.signals.queuing, - "Locally preparing data to send", - State.LOCAL_QUEUING, - True, - noninteractive, - ) - self._transfer_tracker_adapter( - sync_client.signals.pushing, - "Sending data ({})".format(transfer_message), - State.PUSHING, - noninteractive, - ) - self._queueing_tracker_adapter( - sync_client.signals.dequeuing, - "Remotely integrating data", - State.REMOTE_DEQUEUING, - True, - noninteractive, - ) + def _setup_push_progress_tracking(self, sync_client, noninteractive): + """ + Sets up progress trackers for push stages + + :type sync_client: morango.sync.syncsession.SyncClient + :type noninteractive: bool + """ + transfer_message = "{records_transferred}/{records_total}, {transfer_total}" + + self._queueing_tracker_adapter( + sync_client.signals.queuing, + "Locally preparing data to send", + State.LOCAL_QUEUING, + True, + noninteractive, + ) + self._transfer_tracker_adapter( + sync_client.signals.pushing, + "Sending data ({})".format(transfer_message), + State.PUSHING, + noninteractive, + ) + self._queueing_tracker_adapter( + sync_client.signals.dequeuing, + "Remotely integrating data", + State.REMOTE_DEQUEUING, + True, + noninteractive, + ) def _update_all_progress(self, progress_fraction, progress): """ - Override parent progress update callback to report from all of our progress trackers + Override parent progress update callback to report from the progress tracker we're sent """ - total_progress = sum([p.progress for p in self.progresstrackers]) - total = sum([p.total for p in self.progresstrackers]) - progress_fraction = total_progress / float(total) if total > 0.0 else 0.0 - if self.job: self.job.update_progress(progress_fraction, 1.0) self.job.extra_metadata.update(progress.extra_data) @@ -374,7 +375,10 @@ def _queueing_tracker_adapter( tracker = self.start_progress(total=2) def started(transfer_session): - if transfer_session.push == is_push: + dataset_cache.clear() + if transfer_session.push == is_push and ( + noninteractive or tracker.progressbar is None + ): logger.info(message) def handler(transfer_session): @@ -386,5 +390,6 @@ def handler(transfer_session): if noninteractive or tracker.progressbar is None: signal_group.started.connect(started) + signal_group.started.connect(started) signal_group.started.connect(handler) signal_group.completed.connect(handler) diff --git a/kolibri/core/auth/management/utils.py b/kolibri/core/auth/management/utils.py index b6b5962d569..dcc99ffbcf9 100644 --- a/kolibri/core/auth/management/utils.py +++ b/kolibri/core/auth/management/utils.py @@ -40,7 +40,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def _interactive_client_facility_selection(): facilities = Facility.objects.all().order_by("name") - message = "Please choose a facility to sync:\n" + message = "Please choose a facility:\n" for idx, facility in enumerate(facilities): message += "{}. {}\n".format(idx + 1, facility.name) idx = input(message) diff --git a/kolibri/core/auth/models.py b/kolibri/core/auth/models.py index b6142499cd6..6e90c98a0d6 100644 --- a/kolibri/core/auth/models.py +++ b/kolibri/core/auth/models.py @@ -75,9 +75,11 @@ from kolibri.core.device.utils import set_device_settings from kolibri.core.errors import KolibriValidationError from kolibri.core.fields import DateTimeTzField +from kolibri.core.utils.cache import NamespacedCacheProxy from kolibri.utils.time_utils import local_now logger = logging.getLogger(__name__) +dataset_cache = NamespacedCacheProxy(cache, "dataset") def _has_permissions_class(obj): @@ -186,13 +188,13 @@ def cached_related_dataset_lookup(self, related_obj_name): key = "{id}_{db_table}_dataset".format( id=getattr(self, field.attname), db_table=field.related_model._meta.db_table ) - dataset_id = cache.get(key) + dataset_id = dataset_cache.get(key) if dataset_id is None: try: dataset_id = getattr(self, related_obj_name).dataset_id except ObjectDoesNotExist as e: raise ValidationError(e) - cache.set(key, dataset_id, 60 * 10) + dataset_cache.set(key, dataset_id, 60 * 10) return dataset_id def calculate_source_id(self): diff --git a/kolibri/core/tasks/api.py b/kolibri/core/tasks/api.py index ec3281586f9..a63c662b859 100644 --- a/kolibri/core/tasks/api.py +++ b/kolibri/core/tasks/api.py @@ -989,11 +989,11 @@ def validate_prepare_sync_job(request, **kwargs): job_data = dict( facility=facility_id, - chunk_size=50, + chunk_size=200, noninteractive=True, extra_metadata=dict(), track_progress=True, - cancellable=True, + cancellable=False, ) job_data.update(kwargs) diff --git a/kolibri/core/tasks/test/test_api.py b/kolibri/core/tasks/test/test_api.py index 15f62e4dd9a..3869f70b220 100644 --- a/kolibri/core/tasks/test/test_api.py +++ b/kolibri/core/tasks/test/test_api.py @@ -173,7 +173,7 @@ def test_startdataportalsync(self, facility_queue): job_id=123, state="testing", percentage_progress=42, - cancellable=True, + cancellable=False, extra_metadata=dict(this_is_extra=True,), ) facility_queue.fetch_job.return_value = fake_job(**fake_job_data) @@ -190,7 +190,7 @@ def test_startdataportalsync(self, facility_queue): call_command, "sync", facility=self.facility.id, - chunk_size=50, + chunk_size=200, noninteractive=True, extra_metadata=dict( facility=self.facility.id, @@ -202,7 +202,7 @@ def test_startdataportalsync(self, facility_queue): type="SYNCDATAPORTAL", ), track_progress=True, - cancellable=True, + cancellable=False, ) def test_startdataportalbulksync(self, facility_queue): @@ -217,7 +217,7 @@ def test_startdataportalbulksync(self, facility_queue): job_id=123, state="testing", percentage_progress=42, - cancellable=True, + cancellable=False, extra_metadata=dict(this_is_extra=True,), ) facility_queue.fetch_job.return_value = fake_job(**fake_job_data) @@ -234,7 +234,7 @@ def test_startdataportalbulksync(self, facility_queue): call_command, "sync", facility=facility2.id, - chunk_size=50, + chunk_size=200, noninteractive=True, extra_metadata=dict( facility=facility2.id, @@ -246,13 +246,13 @@ def test_startdataportalbulksync(self, facility_queue): type="SYNCDATAPORTAL", ), track_progress=True, - cancellable=True, + cancellable=False, ), call( call_command, "sync", facility=facility3.id, - chunk_size=50, + chunk_size=200, noninteractive=True, extra_metadata=dict( facility=facility3.id, @@ -264,7 +264,7 @@ def test_startdataportalbulksync(self, facility_queue): type="SYNCDATAPORTAL", ), track_progress=True, - cancellable=True, + cancellable=False, ), ] ) @@ -296,11 +296,11 @@ def test_startpeerfacilityimport( baseurl="https://some.server.test/", facility=self.facility.id, no_push=True, - chunk_size=50, + chunk_size=200, noninteractive=True, extra_metadata=extra_metadata, track_progress=True, - cancellable=True, + cancellable=False, ) validate_and_prepare_peer_sync_job.return_value = prepared_data.copy() @@ -309,7 +309,7 @@ def test_startpeerfacilityimport( job_id=123, state="testing", percentage_progress=42, - cancellable=True, + cancellable=False, extra_metadata=dict(this_is_extra=True,), ) fake_job_data["extra_metadata"].update(extra_metadata) @@ -354,11 +354,11 @@ def test_startpeerfacilitysync( prepared_data = dict( baseurl="https://some.server.test/", facility=self.facility.id, - chunk_size=50, + chunk_size=200, noninteractive=True, extra_metadata=extra_metadata, track_progress=True, - cancellable=True, + cancellable=False, ) validate_and_prepare_peer_sync_job.return_value = prepared_data.copy() @@ -367,7 +367,7 @@ def test_startpeerfacilitysync( job_id=123, state="testing", percentage_progress=42, - cancellable=True, + cancellable=False, extra_metadata=dict(this_is_extra=True,), ) fake_job_data["extra_metadata"].update(extra_metadata) @@ -494,10 +494,10 @@ def test_validate_prepare_sync_job(self): expected = dict( facility=123, - chunk_size=50, + chunk_size=200, noninteractive=True, track_progress=True, - cancellable=True, + cancellable=False, extra_metadata=dict(type="test"), ) actual = validate_prepare_sync_job(req, extra_metadata=dict(type="test")) @@ -550,10 +550,10 @@ def test_validate_and_prepare_peer_sync_job( expected = dict( baseurl="https://some.server.test/", facility=123, - chunk_size=50, + chunk_size=200, noninteractive=True, track_progress=True, - cancellable=True, + cancellable=False, extra_metadata=dict(type="test"), ) actual = validate_and_prepare_peer_sync_job( diff --git a/kolibri/core/tasks/utils.py b/kolibri/core/tasks/utils.py index 75528624e97..3125a70f5b4 100644 --- a/kolibri/core/tasks/utils.py +++ b/kolibri/core/tasks/utils.py @@ -1,16 +1,10 @@ import importlib import logging -import os import time import uuid -try: - from thread import get_ident -except ImportError: - from threading import get_ident - from kolibri.core.tasks import compat -from kolibri.core.utils.cache import process_cache +from kolibri.core.utils.cache import DiskCacheRLock # An object on which to store data about the current job @@ -126,50 +120,4 @@ def shutdown(self): self.stop() -class DiskCacheRLock(object): - """ - Vendored from - https://github.com/grantjenks/python-diskcache/blob/2d1f43ea2be4c82a430d245de6260c3e18059ba1/diskcache/recipes.py - """ - - def __init__(self, cache, key, expire=None): - self._cache = cache - self._key = key - self._expire = expire - - def acquire(self): - "Acquire lock by incrementing count using spin-lock algorithm." - pid = os.getpid() - tid = get_ident() - pid_tid = "{}-{}".format(pid, tid) - - while True: - value, count = self._cache.get(self._key, (None, 0)) - if pid_tid == value or count == 0: - self._cache.set(self._key, (pid_tid, count + 1), self._expire) - return - time.sleep(0.001) - - def release(self): - "Release lock by decrementing count." - pid = os.getpid() - tid = get_ident() - pid_tid = "{}-{}".format(pid, tid) - - value, count = self._cache.get(self._key, default=(None, 0)) - is_owned = pid_tid == value and count > 0 - assert is_owned, "cannot release un-acquired lock" - self._cache.set(self._key, (value, count - 1), self._expire) - - # RLOCK leaves the db connection open after releasing the lock - # Let's ensure it's correctly closed - self._cache.close() - - def __enter__(self): - self.acquire() - - def __exit__(self, *exc_info): - self.release() - - -db_task_write_lock = DiskCacheRLock(process_cache, "db_task_write_lock") +db_task_write_lock = DiskCacheRLock("db_task_write_lock") diff --git a/kolibri/core/test/test_utils.py b/kolibri/core/test/test_utils.py new file mode 100644 index 00000000000..07cf0c03e1c --- /dev/null +++ b/kolibri/core/test/test_utils.py @@ -0,0 +1,66 @@ +import mock +from django.core.cache.backends.base import BaseCache +from django.test import TestCase + +from kolibri.core.utils.cache import NamespacedCacheProxy + + +class NamespacedCacheProxyTestCase(TestCase): + def setUp(self): + self.cache = mock.Mock(spec=BaseCache) + self.proxy = NamespacedCacheProxy(self.cache, "test") + + def test_get_keys(self): + self.cache.get.return_value = ["abc"] + self.assertEqual(["abc"], self.proxy._get_keys()) + self.cache.get.assert_called_with("test:1:__KEYS__", default=[]) + + def test_set_keys(self): + self.proxy._set_keys(["abc"]) + self.cache.set.assert_called_with("test:1:__KEYS__", ["abc"]) + + def test_add(self): + self.cache.add.return_value = True + self.cache.get.return_value = [] + + result = self.proxy.add("abc", 123, "normal arg", timeout=456) + self.assertTrue(result) + + self.cache.add.assert_called_with("test:1:abc", 123, "normal arg", timeout=456) + self.cache.set.assert_called_with("test:1:__KEYS__", ["abc"]) + + def test_add__failed(self): + self.cache.add.return_value = False + self.cache.get.return_value = [] + + result = self.proxy.add("abc", 123) + self.assertFalse(result) + + self.cache.add.assert_called_with("test:1:abc", 123) + self.cache.set.assert_not_called() + + def test_set(self): + self.cache.get.return_value = [] + + self.proxy.set("abc", 123, "normal arg", timeout=456) + + self.cache.set.assert_any_call("test:1:__KEYS__", ["abc"]) + self.cache.set.assert_any_call("test:1:abc", 123, "normal arg", timeout=456) + + def test_delete(self): + self.cache.get.return_value = ["abc"] + + self.proxy.delete("abc", 123, "normal arg", extra=456) + + self.cache.set.assert_called_with("test:1:__KEYS__", []) + self.cache.delete.assert_called_with("test:1:abc", 123, "normal arg", extra=456) + + def test_clear(self): + self.cache.get.return_value = ["abc", "def", "ghi"] + + self.proxy.clear() + + self.cache.set.assert_called_with("test:1:__KEYS__", []) + self.cache.delete.assert_any_call("test:1:abc") + self.cache.delete.assert_any_call("test:1:def") + self.cache.delete.assert_any_call("test:1:ghi") diff --git a/kolibri/core/utils/cache.py b/kolibri/core/utils/cache.py index 86f6e25fba7..88171d67e44 100644 --- a/kolibri/core/utils/cache.py +++ b/kolibri/core/utils/cache.py @@ -1,5 +1,14 @@ +import os +import time + +try: + from thread import get_ident +except ImportError: + from threading import get_ident + from django.core.cache import caches from django.core.cache import InvalidCacheBackendError +from django.core.cache.backends.base import BaseCache from django.utils.functional import SimpleLazyObject @@ -11,3 +20,132 @@ def __get_process_cache(): process_cache = SimpleLazyObject(__get_process_cache) + + +class DiskCacheRLock(object): + """ + Vendored from + https://github.com/grantjenks/python-diskcache/blob/2d1f43ea2be4c82a430d245de6260c3e18059ba1/diskcache/recipes.py + """ + + def __init__(self, key, expire=None): + self._cache = process_cache + self._key = key + self._expire = expire + + def acquire(self): + "Acquire lock by incrementing count using spin-lock algorithm." + pid = os.getpid() + tid = get_ident() + pid_tid = "{}-{}".format(pid, tid) + + while True: + value, count = self._cache.get(self._key, (None, 0)) + if pid_tid == value or count == 0: + self._cache.set(self._key, (pid_tid, count + 1), self._expire) + return + time.sleep(0.001) + + def release(self): + "Release lock by decrementing count." + pid = os.getpid() + tid = get_ident() + pid_tid = "{}-{}".format(pid, tid) + + value, count = self._cache.get(self._key, default=(None, 0)) + is_owned = pid_tid == value and count > 0 + assert is_owned, "cannot release un-acquired lock" + self._cache.set(self._key, (value, count - 1), self._expire) + + # RLOCK leaves the db connection open after releasing the lock + # Let's ensure it's correctly closed + self._cache.close() + + def __enter__(self): + self.acquire() + + def __exit__(self, *exc_info): + self.release() + + +class NamespacedCacheProxy(BaseCache): + """ + Namespaces keys and retains a record of inserted keys for easy clearing of + all namespaced keys in the cache + """ + + def __init__(self, cache, namespace, **params): + """ + :type cache: BaseCache + :type namespace: str + """ + params.update(KEY_PREFIX=namespace) + super(NamespacedCacheProxy, self).__init__(params) + self.cache = cache + self._lock = DiskCacheRLock("namespaced_cache_{}".format(namespace)) + + def _get_keys(self): + """ + :rtype: list + """ + key = self.make_key("__KEYS__") + return self.cache.get(key, default=[]) + + def _set_keys(self, keys): + """ + :type keys: list + """ + key = self.make_key("__KEYS__") + self.cache.set(key, keys) + + def add(self, key, *args, **kwargs): + """ + :type key: str + :rtype: bool + """ + with self._lock: + keys = self._get_keys() + if key not in keys: + keys.append(key) + result = self.cache.add(self.make_key(key), *args, **kwargs) + if result: + self._set_keys(keys) + + return result + + def get(self, key, *args, **kwargs): + """ + :type key: str + :rtype: any + """ + with self._lock: + return self.cache.get(self.make_key(key), *args, **kwargs) + + def set(self, key, *args, **kwargs): + """ + :type key: str + """ + with self._lock: + keys = self._get_keys() + if key not in keys: + keys.append(key) + self.cache.set(self.make_key(key), *args, **kwargs) + self._set_keys(keys) + + def delete(self, key, *args, **kwargs): + """ + :type key: str + """ + with self._lock: + keys = self._get_keys() + self.cache.delete(self.make_key(key), *args, **kwargs) + self._set_keys([cached_key for cached_key in keys if cached_key != key]) + + def clear(self): + """ + Clears only the cached keys in this namespace + """ + with self._lock: + for key in self._get_keys(): + self.cache.delete(self.make_key(key)) + self._set_keys([]) diff --git a/kolibri/plugins/device/assets/src/views/FacilitiesPage/FacilityTaskPanel.vue b/kolibri/plugins/device/assets/src/views/FacilitiesPage/FacilityTaskPanel.vue index 6bb41c3f986..365a4a270dc 100644 --- a/kolibri/plugins/device/assets/src/views/FacilitiesPage/FacilityTaskPanel.vue +++ b/kolibri/plugins/device/assets/src/views/FacilitiesPage/FacilityTaskPanel.vue @@ -21,12 +21,21 @@ import commonCoreStrings from 'kolibri.coreVue.mixins.commonCoreStrings'; import { + SyncTaskStatuses, syncFacilityTaskDisplayInfo, removeFacilityTaskDisplayInfo, importFacilityTaskDisplayInfo, } from '../syncTaskUtils'; import TaskPanel from './TaskPanel'; + const indeterminateSyncStatuses = [ + SyncTaskStatuses.SESSION_CREATION, + SyncTaskStatuses.LOCAL_QUEUING, + SyncTaskStatuses.LOCAL_DEQUEUING, + SyncTaskStatuses.REMOTE_QUEUING, + SyncTaskStatuses.REMOTE_DEQUEUING, + ]; + export default { name: 'FacilityTaskPanel', components: { @@ -69,6 +78,11 @@ return {}; }, loaderType() { + const { sync_state = '' } = this.task; + if (indeterminateSyncStatuses.find(s => s === sync_state)) { + return 'indeterminate'; + } + return 'determinate'; }, statusMsg() { diff --git a/kolibri/plugins/device/assets/src/views/syncTaskUtils.js b/kolibri/plugins/device/assets/src/views/syncTaskUtils.js index a93a9acb756..1aeb8b218ef 100644 --- a/kolibri/plugins/device/assets/src/views/syncTaskUtils.js +++ b/kolibri/plugins/device/assets/src/views/syncTaskUtils.js @@ -3,7 +3,7 @@ import taskStrings from 'kolibri.coreVue.mixins.commonTaskStrings'; import bytesForHumans from 'kolibri.utils.bytesForHumans'; import { taskIsClearable, TaskStatuses } from '../constants'; -const SyncTaskStatuses = { +export const SyncTaskStatuses = { SESSION_CREATION: 'SESSION_CREATION', REMOTE_QUEUING: 'REMOTE_QUEUING', PULLING: 'PULLING', diff --git a/requirements/base.txt b/requirements/base.txt index 53b00404c84..c6ece810a33 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -20,7 +20,7 @@ kolibri_exercise_perseus_plugin==1.3.2 jsonfield==2.0.2 requests-toolbelt==0.8.0 clint==0.5.1 -morango==0.5.0 +morango==0.5.1 tzlocal==1.5.1 pytz==2018.5 python-dateutil==2.7.5