Skip to content

Commit

Permalink
Merge pull request #7187 from bjester/facility-cmd-improvements
Browse files Browse the repository at this point in the history
Facility command and task improvements
  • Loading branch information
jonboiser authored Jun 25, 2020
2 parents 85c5e8c + 7679955 commit a35eb33
Show file tree
Hide file tree
Showing 12 changed files with 311 additions and 138 deletions.
4 changes: 2 additions & 2 deletions kolibri/core/auth/management/commands/deletefacility.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from kolibri.core.auth.management.utils import DisablePostDeleteSignal
from kolibri.core.auth.management.utils import get_facility
from kolibri.core.auth.models import AdHocGroup
from kolibri.core.auth.models import cache
from kolibri.core.auth.models import Classroom
from kolibri.core.auth.models import Collection
from kolibri.core.auth.models import dataset_cache
from kolibri.core.auth.models import FacilityDataset
from kolibri.core.auth.models import FacilityUser
from kolibri.core.auth.models import LearnerGroup
Expand Down Expand Up @@ -205,7 +205,7 @@ def handle_async(self, *args, **options):
count, stats = delete_group.delete(update_progress)
total_deleted += count
# clear related cache
cache.clear()
dataset_cache.clear()

# if count doesn't match, something doesn't seem right
if total_count != total_deleted:
Expand Down
119 changes: 62 additions & 57 deletions kolibri/core/auth/management/commands/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from kolibri.core.auth.constants.morango_sync import State
from kolibri.core.auth.management.utils import get_facility
from kolibri.core.auth.management.utils import run_once
from kolibri.core.auth.models import dataset_cache
from kolibri.core.tasks.management.commands.base import AsyncCommand
from kolibri.core.tasks.utils import db_task_write_lock
from kolibri.utils import conf
Expand Down Expand Up @@ -103,6 +104,8 @@ def handle_async(self, *args, **options):
if not ScopeDefinition.objects.filter():
call_command("loaddata", "scopedefinitions")

dataset_cache.clear()

# try to connect to server
controller = MorangoProfileController(PROFILE_FACILITY_DATA)
network_connection = controller.create_network_connection(baseurl)
Expand Down Expand Up @@ -173,13 +176,9 @@ def handle_async(self, *args, **options):
client_cert, server_cert, chunk_size=chunk_size
)

# setup all progress trackers before starting so CLI progress is accurate
self._setup_progress_tracking(
sync_client, not no_pull, not no_push, noninteractive
)

# pull from server and push our own data to server
if not no_pull:
self._setup_pull_progress_tracking(sync_client, noninteractive)
self._session_tracker_adapter(
sync_client.signals.session,
"Creating pull transfer session",
Expand All @@ -188,6 +187,7 @@ def handle_async(self, *args, **options):
with db_task_write_lock:
sync_client.initiate_pull(Filter(dataset_id))
if not no_push:
self._setup_push_progress_tracking(sync_client, noninteractive)
self._session_tracker_adapter(
sync_client.signals.session,
"Creating push transfer session",
Expand All @@ -209,69 +209,70 @@ def handle_async(self, *args, **options):

logger.info("Syncing has been completed.")

def _setup_progress_tracking(self, sync_client, pulling, pushing, noninteractive):
def _setup_pull_progress_tracking(self, sync_client, noninteractive):
"""
Sets up progress trackers for the various sync stages
Sets up progress trackers for pull stages
:type sync_client: morango.sync.syncsession.SyncClient
:type pulling: bool
:type pushing: bool
:type noninteractive: bool
"""
transfer_message = "{records_transferred}/{records_total}, {transfer_total}"

if pulling:
self._queueing_tracker_adapter(
sync_client.signals.queuing,
"Remotely preparing data",
State.REMOTE_QUEUING,
False,
noninteractive,
)
self._transfer_tracker_adapter(
sync_client.signals.pulling,
"Receiving data ({})".format(transfer_message),
State.PULLING,
noninteractive,
)
self._queueing_tracker_adapter(
sync_client.signals.dequeuing,
"Locally integrating received data",
State.LOCAL_DEQUEUING,
False,
noninteractive,
)
self._queueing_tracker_adapter(
sync_client.signals.queuing,
"Remotely preparing data",
State.REMOTE_QUEUING,
False,
noninteractive,
)
self._transfer_tracker_adapter(
sync_client.signals.pulling,
"Receiving data ({})".format(transfer_message),
State.PULLING,
noninteractive,
)
self._queueing_tracker_adapter(
sync_client.signals.dequeuing,
"Locally integrating received data",
State.LOCAL_DEQUEUING,
False,
noninteractive,
)

if pushing:
self._queueing_tracker_adapter(
sync_client.signals.queuing,
"Locally preparing data to send",
State.LOCAL_QUEUING,
True,
noninteractive,
)
self._transfer_tracker_adapter(
sync_client.signals.pushing,
"Sending data ({})".format(transfer_message),
State.PUSHING,
noninteractive,
)
self._queueing_tracker_adapter(
sync_client.signals.dequeuing,
"Remotely integrating data",
State.REMOTE_DEQUEUING,
True,
noninteractive,
)
def _setup_push_progress_tracking(self, sync_client, noninteractive):
"""
Sets up progress trackers for push stages
:type sync_client: morango.sync.syncsession.SyncClient
:type noninteractive: bool
"""
transfer_message = "{records_transferred}/{records_total}, {transfer_total}"

self._queueing_tracker_adapter(
sync_client.signals.queuing,
"Locally preparing data to send",
State.LOCAL_QUEUING,
True,
noninteractive,
)
self._transfer_tracker_adapter(
sync_client.signals.pushing,
"Sending data ({})".format(transfer_message),
State.PUSHING,
noninteractive,
)
self._queueing_tracker_adapter(
sync_client.signals.dequeuing,
"Remotely integrating data",
State.REMOTE_DEQUEUING,
True,
noninteractive,
)

def _update_all_progress(self, progress_fraction, progress):
"""
Override parent progress update callback to report from all of our progress trackers
Override parent progress update callback to report from the progress tracker we're sent
"""
total_progress = sum([p.progress for p in self.progresstrackers])
total = sum([p.total for p in self.progresstrackers])
progress_fraction = total_progress / float(total) if total > 0.0 else 0.0

if self.job:
self.job.update_progress(progress_fraction, 1.0)
self.job.extra_metadata.update(progress.extra_data)
Expand Down Expand Up @@ -374,7 +375,10 @@ def _queueing_tracker_adapter(
tracker = self.start_progress(total=2)

def started(transfer_session):
if transfer_session.push == is_push:
dataset_cache.clear()
if transfer_session.push == is_push and (
noninteractive or tracker.progressbar is None
):
logger.info(message)

def handler(transfer_session):
Expand All @@ -386,5 +390,6 @@ def handler(transfer_session):
if noninteractive or tracker.progressbar is None:
signal_group.started.connect(started)

signal_group.started.connect(started)
signal_group.started.connect(handler)
signal_group.completed.connect(handler)
2 changes: 1 addition & 1 deletion kolibri/core/auth/management/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def _interactive_client_facility_selection():
facilities = Facility.objects.all().order_by("name")
message = "Please choose a facility to sync:\n"
message = "Please choose a facility:\n"
for idx, facility in enumerate(facilities):
message += "{}. {}\n".format(idx + 1, facility.name)
idx = input(message)
Expand Down
6 changes: 4 additions & 2 deletions kolibri/core/auth/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@
from kolibri.core.device.utils import set_device_settings
from kolibri.core.errors import KolibriValidationError
from kolibri.core.fields import DateTimeTzField
from kolibri.core.utils.cache import NamespacedCacheProxy
from kolibri.utils.time_utils import local_now

logger = logging.getLogger(__name__)
dataset_cache = NamespacedCacheProxy(cache, "dataset")


def _has_permissions_class(obj):
Expand Down Expand Up @@ -186,13 +188,13 @@ def cached_related_dataset_lookup(self, related_obj_name):
key = "{id}_{db_table}_dataset".format(
id=getattr(self, field.attname), db_table=field.related_model._meta.db_table
)
dataset_id = cache.get(key)
dataset_id = dataset_cache.get(key)
if dataset_id is None:
try:
dataset_id = getattr(self, related_obj_name).dataset_id
except ObjectDoesNotExist as e:
raise ValidationError(e)
cache.set(key, dataset_id, 60 * 10)
dataset_cache.set(key, dataset_id, 60 * 10)
return dataset_id

def calculate_source_id(self):
Expand Down
4 changes: 2 additions & 2 deletions kolibri/core/tasks/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -989,11 +989,11 @@ def validate_prepare_sync_job(request, **kwargs):

job_data = dict(
facility=facility_id,
chunk_size=50,
chunk_size=200,
noninteractive=True,
extra_metadata=dict(),
track_progress=True,
cancellable=True,
cancellable=False,
)

job_data.update(kwargs)
Expand Down
36 changes: 18 additions & 18 deletions kolibri/core/tasks/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def test_startdataportalsync(self, facility_queue):
job_id=123,
state="testing",
percentage_progress=42,
cancellable=True,
cancellable=False,
extra_metadata=dict(this_is_extra=True,),
)
facility_queue.fetch_job.return_value = fake_job(**fake_job_data)
Expand All @@ -190,7 +190,7 @@ def test_startdataportalsync(self, facility_queue):
call_command,
"sync",
facility=self.facility.id,
chunk_size=50,
chunk_size=200,
noninteractive=True,
extra_metadata=dict(
facility=self.facility.id,
Expand All @@ -202,7 +202,7 @@ def test_startdataportalsync(self, facility_queue):
type="SYNCDATAPORTAL",
),
track_progress=True,
cancellable=True,
cancellable=False,
)

def test_startdataportalbulksync(self, facility_queue):
Expand All @@ -217,7 +217,7 @@ def test_startdataportalbulksync(self, facility_queue):
job_id=123,
state="testing",
percentage_progress=42,
cancellable=True,
cancellable=False,
extra_metadata=dict(this_is_extra=True,),
)
facility_queue.fetch_job.return_value = fake_job(**fake_job_data)
Expand All @@ -234,7 +234,7 @@ def test_startdataportalbulksync(self, facility_queue):
call_command,
"sync",
facility=facility2.id,
chunk_size=50,
chunk_size=200,
noninteractive=True,
extra_metadata=dict(
facility=facility2.id,
Expand All @@ -246,13 +246,13 @@ def test_startdataportalbulksync(self, facility_queue):
type="SYNCDATAPORTAL",
),
track_progress=True,
cancellable=True,
cancellable=False,
),
call(
call_command,
"sync",
facility=facility3.id,
chunk_size=50,
chunk_size=200,
noninteractive=True,
extra_metadata=dict(
facility=facility3.id,
Expand All @@ -264,7 +264,7 @@ def test_startdataportalbulksync(self, facility_queue):
type="SYNCDATAPORTAL",
),
track_progress=True,
cancellable=True,
cancellable=False,
),
]
)
Expand Down Expand Up @@ -296,11 +296,11 @@ def test_startpeerfacilityimport(
baseurl="https://some.server.test/",
facility=self.facility.id,
no_push=True,
chunk_size=50,
chunk_size=200,
noninteractive=True,
extra_metadata=extra_metadata,
track_progress=True,
cancellable=True,
cancellable=False,
)
validate_and_prepare_peer_sync_job.return_value = prepared_data.copy()

Expand All @@ -309,7 +309,7 @@ def test_startpeerfacilityimport(
job_id=123,
state="testing",
percentage_progress=42,
cancellable=True,
cancellable=False,
extra_metadata=dict(this_is_extra=True,),
)
fake_job_data["extra_metadata"].update(extra_metadata)
Expand Down Expand Up @@ -354,11 +354,11 @@ def test_startpeerfacilitysync(
prepared_data = dict(
baseurl="https://some.server.test/",
facility=self.facility.id,
chunk_size=50,
chunk_size=200,
noninteractive=True,
extra_metadata=extra_metadata,
track_progress=True,
cancellable=True,
cancellable=False,
)
validate_and_prepare_peer_sync_job.return_value = prepared_data.copy()

Expand All @@ -367,7 +367,7 @@ def test_startpeerfacilitysync(
job_id=123,
state="testing",
percentage_progress=42,
cancellable=True,
cancellable=False,
extra_metadata=dict(this_is_extra=True,),
)
fake_job_data["extra_metadata"].update(extra_metadata)
Expand Down Expand Up @@ -494,10 +494,10 @@ def test_validate_prepare_sync_job(self):

expected = dict(
facility=123,
chunk_size=50,
chunk_size=200,
noninteractive=True,
track_progress=True,
cancellable=True,
cancellable=False,
extra_metadata=dict(type="test"),
)
actual = validate_prepare_sync_job(req, extra_metadata=dict(type="test"))
Expand Down Expand Up @@ -550,10 +550,10 @@ def test_validate_and_prepare_peer_sync_job(
expected = dict(
baseurl="https://some.server.test/",
facility=123,
chunk_size=50,
chunk_size=200,
noninteractive=True,
track_progress=True,
cancellable=True,
cancellable=False,
extra_metadata=dict(type="test"),
)
actual = validate_and_prepare_peer_sync_job(
Expand Down
Loading

0 comments on commit a35eb33

Please sign in to comment.