Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Facility command and task improvements #7187

Merged
4 changes: 2 additions & 2 deletions kolibri/core/auth/management/commands/deletefacility.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from kolibri.core.auth.management.utils import DisablePostDeleteSignal
from kolibri.core.auth.management.utils import get_facility
from kolibri.core.auth.models import AdHocGroup
from kolibri.core.auth.models import cache
from kolibri.core.auth.models import Classroom
from kolibri.core.auth.models import Collection
from kolibri.core.auth.models import dataset_cache
from kolibri.core.auth.models import FacilityDataset
from kolibri.core.auth.models import FacilityUser
from kolibri.core.auth.models import LearnerGroup
Expand Down Expand Up @@ -205,7 +205,7 @@ def handle_async(self, *args, **options):
count, stats = delete_group.delete(update_progress)
total_deleted += count
# clear related cache
cache.clear()
dataset_cache.clear()

# if count doesn't match, something doesn't seem right
if total_count != total_deleted:
Expand Down
119 changes: 62 additions & 57 deletions kolibri/core/auth/management/commands/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from kolibri.core.auth.constants.morango_sync import State
from kolibri.core.auth.management.utils import get_facility
from kolibri.core.auth.management.utils import run_once
from kolibri.core.auth.models import dataset_cache
from kolibri.core.tasks.management.commands.base import AsyncCommand
from kolibri.core.tasks.utils import db_task_write_lock
from kolibri.utils import conf
Expand Down Expand Up @@ -103,6 +104,8 @@ def handle_async(self, *args, **options):
if not ScopeDefinition.objects.filter():
call_command("loaddata", "scopedefinitions")

dataset_cache.clear()

# try to connect to server
controller = MorangoProfileController(PROFILE_FACILITY_DATA)
network_connection = controller.create_network_connection(baseurl)
Expand Down Expand Up @@ -173,13 +176,9 @@ def handle_async(self, *args, **options):
client_cert, server_cert, chunk_size=chunk_size
)

# setup all progress trackers before starting so CLI progress is accurate
self._setup_progress_tracking(
sync_client, not no_pull, not no_push, noninteractive
)

# pull from server and push our own data to server
if not no_pull:
self._setup_pull_progress_tracking(sync_client, noninteractive)
self._session_tracker_adapter(
sync_client.signals.session,
"Creating pull transfer session",
Expand All @@ -188,6 +187,7 @@ def handle_async(self, *args, **options):
with db_task_write_lock:
sync_client.initiate_pull(Filter(dataset_id))
if not no_push:
self._setup_push_progress_tracking(sync_client, noninteractive)
self._session_tracker_adapter(
sync_client.signals.session,
"Creating push transfer session",
Expand All @@ -209,69 +209,70 @@ def handle_async(self, *args, **options):

logger.info("Syncing has been completed.")

def _setup_progress_tracking(self, sync_client, pulling, pushing, noninteractive):
def _setup_pull_progress_tracking(self, sync_client, noninteractive):
"""
Sets up progress trackers for the various sync stages
Sets up progress trackers for pull stages

:type sync_client: morango.sync.syncsession.SyncClient
:type pulling: bool
:type pushing: bool
:type noninteractive: bool
"""
transfer_message = "{records_transferred}/{records_total}, {transfer_total}"

if pulling:
self._queueing_tracker_adapter(
sync_client.signals.queuing,
"Remotely preparing data",
State.REMOTE_QUEUING,
False,
noninteractive,
)
self._transfer_tracker_adapter(
sync_client.signals.pulling,
"Receiving data ({})".format(transfer_message),
State.PULLING,
noninteractive,
)
self._queueing_tracker_adapter(
sync_client.signals.dequeuing,
"Locally integrating received data",
State.LOCAL_DEQUEUING,
False,
noninteractive,
)
self._queueing_tracker_adapter(
sync_client.signals.queuing,
"Remotely preparing data",
State.REMOTE_QUEUING,
False,
noninteractive,
)
self._transfer_tracker_adapter(
sync_client.signals.pulling,
"Receiving data ({})".format(transfer_message),
State.PULLING,
noninteractive,
)
self._queueing_tracker_adapter(
sync_client.signals.dequeuing,
"Locally integrating received data",
State.LOCAL_DEQUEUING,
False,
noninteractive,
)

if pushing:
self._queueing_tracker_adapter(
sync_client.signals.queuing,
"Locally preparing data to send",
State.LOCAL_QUEUING,
True,
noninteractive,
)
self._transfer_tracker_adapter(
sync_client.signals.pushing,
"Sending data ({})".format(transfer_message),
State.PUSHING,
noninteractive,
)
self._queueing_tracker_adapter(
sync_client.signals.dequeuing,
"Remotely integrating data",
State.REMOTE_DEQUEUING,
True,
noninteractive,
)
def _setup_push_progress_tracking(self, sync_client, noninteractive):
"""
Sets up progress trackers for push stages

:type sync_client: morango.sync.syncsession.SyncClient
:type noninteractive: bool
"""
transfer_message = "{records_transferred}/{records_total}, {transfer_total}"

self._queueing_tracker_adapter(
sync_client.signals.queuing,
"Locally preparing data to send",
State.LOCAL_QUEUING,
True,
noninteractive,
)
self._transfer_tracker_adapter(
sync_client.signals.pushing,
"Sending data ({})".format(transfer_message),
State.PUSHING,
noninteractive,
)
self._queueing_tracker_adapter(
sync_client.signals.dequeuing,
"Remotely integrating data",
State.REMOTE_DEQUEUING,
True,
noninteractive,
)

def _update_all_progress(self, progress_fraction, progress):
"""
Override parent progress update callback to report from all of our progress trackers
Override parent progress update callback to report from the progress tracker we're sent
"""
total_progress = sum([p.progress for p in self.progresstrackers])
total = sum([p.total for p in self.progresstrackers])
progress_fraction = total_progress / float(total) if total > 0.0 else 0.0

if self.job:
self.job.update_progress(progress_fraction, 1.0)
self.job.extra_metadata.update(progress.extra_data)
Expand Down Expand Up @@ -374,7 +375,10 @@ def _queueing_tracker_adapter(
tracker = self.start_progress(total=2)

def started(transfer_session):
if transfer_session.push == is_push:
dataset_cache.clear()
if transfer_session.push == is_push and (
noninteractive or tracker.progressbar is None
):
logger.info(message)

def handler(transfer_session):
Expand All @@ -386,5 +390,6 @@ def handler(transfer_session):
if noninteractive or tracker.progressbar is None:
signal_group.started.connect(started)

signal_group.started.connect(started)
signal_group.started.connect(handler)
signal_group.completed.connect(handler)
2 changes: 1 addition & 1 deletion kolibri/core/auth/management/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def _interactive_client_facility_selection():
facilities = Facility.objects.all().order_by("name")
message = "Please choose a facility to sync:\n"
message = "Please choose a facility:\n"
for idx, facility in enumerate(facilities):
message += "{}. {}\n".format(idx + 1, facility.name)
idx = input(message)
Expand Down
6 changes: 4 additions & 2 deletions kolibri/core/auth/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@
from kolibri.core.device.utils import set_device_settings
from kolibri.core.errors import KolibriValidationError
from kolibri.core.fields import DateTimeTzField
from kolibri.core.utils.cache import NamespacedCacheProxy
from kolibri.utils.time_utils import local_now

logger = logging.getLogger(__name__)
dataset_cache = NamespacedCacheProxy(cache, "dataset")


def _has_permissions_class(obj):
Expand Down Expand Up @@ -186,13 +188,13 @@ def cached_related_dataset_lookup(self, related_obj_name):
key = "{id}_{db_table}_dataset".format(
id=getattr(self, field.attname), db_table=field.related_model._meta.db_table
)
dataset_id = cache.get(key)
dataset_id = dataset_cache.get(key)
if dataset_id is None:
try:
dataset_id = getattr(self, related_obj_name).dataset_id
except ObjectDoesNotExist as e:
raise ValidationError(e)
cache.set(key, dataset_id, 60 * 10)
dataset_cache.set(key, dataset_id, 60 * 10)
return dataset_id

def calculate_source_id(self):
Expand Down
4 changes: 2 additions & 2 deletions kolibri/core/tasks/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -989,11 +989,11 @@ def validate_prepare_sync_job(request, **kwargs):

job_data = dict(
facility=facility_id,
chunk_size=50,
chunk_size=200,
noninteractive=True,
extra_metadata=dict(),
track_progress=True,
cancellable=True,
cancellable=False,
)

job_data.update(kwargs)
Expand Down
36 changes: 18 additions & 18 deletions kolibri/core/tasks/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def test_startdataportalsync(self, facility_queue):
job_id=123,
state="testing",
percentage_progress=42,
cancellable=True,
cancellable=False,
extra_metadata=dict(this_is_extra=True,),
)
facility_queue.fetch_job.return_value = fake_job(**fake_job_data)
Expand All @@ -190,7 +190,7 @@ def test_startdataportalsync(self, facility_queue):
call_command,
"sync",
facility=self.facility.id,
chunk_size=50,
chunk_size=200,
noninteractive=True,
extra_metadata=dict(
facility=self.facility.id,
Expand All @@ -202,7 +202,7 @@ def test_startdataportalsync(self, facility_queue):
type="SYNCDATAPORTAL",
),
track_progress=True,
cancellable=True,
cancellable=False,
)

def test_startdataportalbulksync(self, facility_queue):
Expand All @@ -217,7 +217,7 @@ def test_startdataportalbulksync(self, facility_queue):
job_id=123,
state="testing",
percentage_progress=42,
cancellable=True,
cancellable=False,
extra_metadata=dict(this_is_extra=True,),
)
facility_queue.fetch_job.return_value = fake_job(**fake_job_data)
Expand All @@ -234,7 +234,7 @@ def test_startdataportalbulksync(self, facility_queue):
call_command,
"sync",
facility=facility2.id,
chunk_size=50,
chunk_size=200,
noninteractive=True,
extra_metadata=dict(
facility=facility2.id,
Expand All @@ -246,13 +246,13 @@ def test_startdataportalbulksync(self, facility_queue):
type="SYNCDATAPORTAL",
),
track_progress=True,
cancellable=True,
cancellable=False,
),
call(
call_command,
"sync",
facility=facility3.id,
chunk_size=50,
chunk_size=200,
noninteractive=True,
extra_metadata=dict(
facility=facility3.id,
Expand All @@ -264,7 +264,7 @@ def test_startdataportalbulksync(self, facility_queue):
type="SYNCDATAPORTAL",
),
track_progress=True,
cancellable=True,
cancellable=False,
),
]
)
Expand Down Expand Up @@ -296,11 +296,11 @@ def test_startpeerfacilityimport(
baseurl="https://some.server.test/",
facility=self.facility.id,
no_push=True,
chunk_size=50,
chunk_size=200,
noninteractive=True,
extra_metadata=extra_metadata,
track_progress=True,
cancellable=True,
cancellable=False,
)
validate_and_prepare_peer_sync_job.return_value = prepared_data.copy()

Expand All @@ -309,7 +309,7 @@ def test_startpeerfacilityimport(
job_id=123,
state="testing",
percentage_progress=42,
cancellable=True,
cancellable=False,
extra_metadata=dict(this_is_extra=True,),
)
fake_job_data["extra_metadata"].update(extra_metadata)
Expand Down Expand Up @@ -354,11 +354,11 @@ def test_startpeerfacilitysync(
prepared_data = dict(
baseurl="https://some.server.test/",
facility=self.facility.id,
chunk_size=50,
chunk_size=200,
noninteractive=True,
extra_metadata=extra_metadata,
track_progress=True,
cancellable=True,
cancellable=False,
)
validate_and_prepare_peer_sync_job.return_value = prepared_data.copy()

Expand All @@ -367,7 +367,7 @@ def test_startpeerfacilitysync(
job_id=123,
state="testing",
percentage_progress=42,
cancellable=True,
cancellable=False,
extra_metadata=dict(this_is_extra=True,),
)
fake_job_data["extra_metadata"].update(extra_metadata)
Expand Down Expand Up @@ -494,10 +494,10 @@ def test_validate_prepare_sync_job(self):

expected = dict(
facility=123,
chunk_size=50,
chunk_size=200,
noninteractive=True,
track_progress=True,
cancellable=True,
cancellable=False,
extra_metadata=dict(type="test"),
)
actual = validate_prepare_sync_job(req, extra_metadata=dict(type="test"))
Expand Down Expand Up @@ -550,10 +550,10 @@ def test_validate_and_prepare_peer_sync_job(
expected = dict(
baseurl="https://some.server.test/",
facility=123,
chunk_size=50,
chunk_size=200,
noninteractive=True,
track_progress=True,
cancellable=True,
cancellable=False,
extra_metadata=dict(type="test"),
)
actual = validate_and_prepare_peer_sync_job(
Expand Down
Loading