Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel transfer to the Tucson mirror #61

Merged
merged 12 commits into from
Sep 26, 2024
8 changes: 4 additions & 4 deletions bin/desi_tucson_transfer_catchup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ src=rsync://${DESISYNC_HOSTNAME}/desi
dst=${DESI_ROOT}
log_root=${HOME}/Documents/Logfiles
#
# Execute rsync commands.
# Execute rsync commands. Do not exceed 10 commands!
#
for d in engineering/focalplane engineering/focalplane/hwtables \
spectro/data \
spectro/redux/daily spectro/redux/daily/exposures spectro/redux/daily/preproc spectro/redux/daily/tiles \
spectro/nightwatch/kpno spectro/staging/lost+found; do
case ${d} in
engineering/focalplane) priority='nice'; exclude='--exclude archive --exclude hwtables --exclude *.ipynb --exclude .ipynb_checkpoints' ;;
engineering/focalplane/hwtables) priority='nice'; exclude='--include *.csv --exclude *' ;;
# engineering/focalplane/hwtables) priority='nice'; exclude='--include *.csv --exclude *' ;;
spectro/data) priority=''; exclude='--exclude 2018* --exclude 2019* --exclude 2020* --exclude 2021* --exclude 2022* --exclude 2023*' ;;
spectro/nightwatch/kpno) priority='nice'; exclude='--exclude 2021* --exclude 2022* --exclude 2023*' ;;
spectro/redux/daily) priority=''; exclude='--exclude *.tmp --exclude attic --exclude exposures --exclude preproc --exclude temp --exclude tiles' ;;
spectro/redux/daily/exposures) priority=''; exclude='--exclude *.tmp' ;;
spectro/redux/daily/preproc) priority=''; exclude='--exclude *.tmp --exclude preproc-*.fits --exclude preproc-*.fits.gz' ;;
spectro/redux/daily/exposures) priority=''; exclude='--exclude 2019* --exclude 2020* --exclude 2021* --exclude 2022* --exclude 2023* --exclude *.tmp' ;;
spectro/redux/daily/preproc) priority=''; exclude='--exclude 2019* --exclude 2020* --exclude 2021* --exclude 2022* --exclude 2023* --exclude *.tmp --exclude preproc-*.fits --exclude preproc-*.fits.gz' ;;
spectro/redux/daily/tiles) priority=''; exclude='--exclude *.tmp --exclude temp' ;;
*) priority='nice'; exclude='' ;;
esac
Expand Down
4 changes: 3 additions & 1 deletion doc/changes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ Change Log
1.0.3 (unreleased)
------------------

* No changes yet.
* Convert Tucson transfer to parallel operation (PR `#61`_).

.. _`#61`: https://github.com/desihub/desitransfer/pull/61

1.0.2 (2024-06-21)
------------------
Expand Down
55 changes: 51 additions & 4 deletions py/desitransfer/test/test_tucson.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import unittest
import logging
import subprocess as sub
from unittest.mock import patch, call, mock_open
from ..tucson import _options, _rsync, _configure_log, running
from tempfile import mkdtemp
from shutil import rmtree
from unittest.mock import patch, call, mock_open, MagicMock
from ..tucson import _options, _rsync, _configure_log, running, _get_proc
from .. import __version__ as dtVersion


Expand All @@ -18,11 +20,11 @@ class TestTucson(unittest.TestCase):

@classmethod
def setUpClass(cls):
pass
cls.temp_dir = mkdtemp()

@classmethod
def tearDownClass(cls):
pass
rmtree(cls.temp_dir)

def setUp(self):
pass
Expand Down Expand Up @@ -164,3 +166,48 @@ def test_running_read_exit(self, mock_log, mock_popen, mock_exists, mock_remove)
stdout=sub.PIPE, stderr=sub.PIPE),
call().communicate()])
mock_popen().communicate.assert_called_once()

@patch('subprocess.Popen')
@patch('desitransfer.tucson.log')
@patch('desitransfer.tucson.priority')
def test_get_proc(self, mock_priority, mock_log, mock_popen):
"""Test the function for generating external procedures.
"""
home = os.environ['HOME']
directories = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']
exclude = set(['d', 'g'])
mock_priority.__contains__ = lambda self, x: x == 'e' or x == 'f'
# mock_priority.__iter__.return_value = ('e', 'f')
options = MagicMock()
options.test = False
options.log = self.temp_dir
proc, LOG_A, d = _get_proc(directories, exclude, '/src', '/dst', options)
self.assertEqual(d, 'a')
LOG_A.close()
options.test = True
proc, LOG_B, d = _get_proc(directories, exclude, '/src', '/dst', options)
self.assertEqual(d, 'b')
self.assertEqual(LOG_B, os.path.join(self.temp_dir, 'desi_tucson_transfer_b.log'))
proc, LOG_C, d = _get_proc(directories, exclude, '/src', '/dst', options)
self.assertEqual(d, 'c')
options.test = False
proc, LOG_E, d = _get_proc(directories, exclude, '/src', '/dst', options)
self.assertEqual(d, 'e')
proc, LOG_F, d = _get_proc(directories, exclude, '/src', '/dst', options)
self.assertEqual(d, 'f')
proc, LOG_H, d = _get_proc(directories, exclude, '/src', '/dst', options)
self.assertEqual(d, 'h')
proc, LOG_I, d = _get_proc(directories, exclude, '/src', '/dst', options)
self.assertEqual(d, 'i')
proc, LOG_J, d = _get_proc(directories, exclude, '/src', '/dst', options)
self.assertIsNone(proc)
mock_log.info.assert_has_calls([call(f'/usr/bin/rsync --archive --checksum --verbose --delete --delete-after --no-motd --password-file {home}/.desi /src/a/ /dst/a/'),
call("Directory '%s' will be transferred with os.nice(%d)", 'a', 5),
call(f'/usr/bin/rsync --archive --checksum --verbose --delete --delete-after --no-motd --password-file {home}/.desi /src/e/ /dst/e/'),
call(f'/usr/bin/rsync --archive --checksum --verbose --delete --delete-after --no-motd --password-file {home}/.desi /src/f/ /dst/f/'),
call(f'/usr/bin/rsync --archive --checksum --verbose --delete --delete-after --no-motd --password-file {home}/.desi /src/h/ /dst/h/'),
call("Directory '%s' will be transferred with os.nice(%d)", 'h', 5),
call(f'/usr/bin/rsync --archive --checksum --verbose --delete --delete-after --no-motd --password-file {home}/.desi /src/i/ /dst/i/'),
call("Directory '%s' will be transferred with os.nice(%d)", 'i', 5)])
mock_log.warning.assert_has_calls([call('%s skipped at user request.', 'd'),
call('%s skipped at user request.', 'g')])
121 changes: 102 additions & 19 deletions py/desitransfer/tucson.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,37 @@
'spectro/redux/daily/preproc',
'spectro/redux/daily/tiles',
'engineering/focalplane',
'engineering/focalplane/hwtables',
'software/AnyConnect',
'software/CiscoSecureClient']


includes = {'engineering/focalplane': ["--exclude", "archive", "--exclude", "hwtables",
"--exclude", ".ipynb_checkpoints", "--exclude", "*.ipynb"],
'engineering/focalplane/hwtables': ["--include", "*.csv", "--exclude", "*"],
# 'engineering/focalplane/hwtables': ["--include", "*.csv", "--exclude", "*"],
'spectro/desi_spectro_calib': ["--exclude", ".svn"],
'spectro/data': exclude_years(2018),
'spectro/nightwatch/kpno': exclude_years(2021),
'spectro/redux/daily': ["--exclude", "*.tmp", "--exclude", "attic",
"--exclude", "exposures", "--exclude", "preproc",
"--exclude", "temp", "--exclude", "tiles"],
'spectro/redux/daily/exposures': ["--exclude", "*.tmp"],
'spectro/redux/daily/preproc': ["--exclude", "*.tmp", "--exclude", "preproc-*.fits",
"--exclude", "preproc-*.fits.gz"],
'spectro/redux/daily/exposures': exclude_years(2019) + ["--exclude", "*.tmp"],
'spectro/redux/daily/preproc': exclude_years(2019) + ["--exclude", "*.tmp", "--exclude", "preproc-*.fits",
"--exclude", "preproc-*.fits.gz"],
'spectro/redux/daily/tiles': ["--exclude", "*.tmp", "--exclude", "temp"],
'spectro/templates/basis_templates': ["--exclude", ".svn", "--exclude", "basis_templates_svn-old"],
'survey/ops/surveyops/trunk': ["--exclude", ".svn", "--exclude", "cronupdate.log"],
'target/catalogs': ["--include", "dr8", "--include", "dr9",
"--include", "gaiadr2", "--include", "subpriority", "--exclude", "*"]}


priority = ('spectro/data',
'spectro/redux/daily',
'spectro/redux/daily/exposures',
'spectro/redux/daily/preproc',
'spectro/redux/daily/tiles')


def _configure_log(debug):
"""Re-configure the default logger returned by ``desiutil.log``.

Expand Down Expand Up @@ -130,6 +138,9 @@ def _options():
prsr.add_argument('-l', '--log', metavar='DIR',
default=os.path.join(os.environ['HOME'], 'Documents', 'Logfiles'),
help='Use DIR for log files (default %(default)s).')
prsr.add_argument('-p', '--processes', action='store', type=int,
dest='nproc', metavar="N", default=10,
help="Number of simultaneous downloads (default %(default)s).")
prsr.add_argument('-s', '--static', action='store_true', dest='static',
help='Also sync static data sets.')
prsr.add_argument('-S', '--sleep', metavar='TIME', default='15m', dest='sleep',
Expand Down Expand Up @@ -166,6 +177,61 @@ def _rsync(src, dst, d, checksum=False):
return cmd


def _get_proc(directories, exclude, src, dst, options, nice=5):
"""Prepare the next download directory for processing.

Parameters
----------
directories : :class:`list`
A list of directories to process.
exclude : :class:`set`
Do not process directories in this set.
src : :class:`str`
Root source directory.
dst : :class:`str`
Root destination directory.
options : :class:`argparse.Namespace`
The parsed command-line options.
nice : :class:`int`, optional.
Lower-priority transfers will be run with this value passed to :func:`os.nice`,
default 5.

Returns
-------
:class:`tuple`
A tuple containing information about the process.
"""
global log

def preexec_nice(): # pragma: no cover
os.nice(nice)

def preexec_pass(): # pragma: no cover
pass

try:
d = directories.pop(0)
while d in exclude:
log.warning("%s skipped at user request.", d)
d = directories.pop(0)
log_file = os.path.join(options.log,
'desi_tucson_transfer_' + d.replace('/', '_') + '.log')
command = _rsync(src, dst, d, checksum=options.checksum)
if options.test:
return (command, log_file, d)
else:
log.info(' '.join(command))
LOG = open(log_file, 'ab')
if d in priority:
preexec_fn = preexec_pass
else:
log.info("Directory '%s' will be transferred with os.nice(%d)", d, nice)
preexec_fn = preexec_nice
return (sub.Popen(command, preexec_fn=preexec_fn, stdout=LOG, stderr=sub.STDOUT), LOG, d)
except IndexError:
return (None, None, None)


def running(pid_file):
"""Test for a duplicate process already running.

Expand Down Expand Up @@ -216,8 +282,15 @@ def main():
try:
foo = os.environ[e]
except KeyError:
log.error("%s must be set!", e)
log.critical("%s must be set!", e)
return 1

#
# Check other options.
#
if options.nproc > 10:
log.critical("Number of simultaneous transfers %d > 10!", options.nproc)
return 1
#
# Source and destination.
#
Expand All @@ -226,7 +299,7 @@ def main():
if 'DESI_ROOT' in os.environ:
dst = os.environ['DESI_ROOT']
else:
log.error("DESI_ROOT must be set, or destination directory set on the command-line (-d DIR)!")
log.critical("DESI_ROOT must be set, or destination directory set on the command-line (-d DIR)!")
return 1
else:
dst = options.destination
Expand All @@ -248,7 +321,7 @@ def main():
try:
sleepy_time = int(options.sleep[0:-1]) * suffix[s]
except ValueError:
log.error("Invalid value for sleep interval: '%s'!", options.sleep)
log.critical("Invalid value for sleep interval: '%s'!", options.sleep)
return 1
log.debug("requests.get('%s')", os.environ['DESISYNC_STATUS_URL'])
if not options.test:
Expand All @@ -266,18 +339,28 @@ def main():
directories = static + dynamic
else:
directories = dynamic
for d in directories:
if d in exclude:
log.warning("%s skipped at user request.", d)
else:
command = _rsync(src, dst, d, checksum=options.checksum)
log.info(' '.join(command))
if not options.test:
log_file = os.path.join(options.log,
'desi_tucson_transfer_' + d.replace('/', '_') + '.log')
with open(log_file, 'ab') as LOG:
proc = sub.Popen(command, stdout=LOG, stderr=sub.STDOUT)
status = proc.wait()
proc_pool = dict()
for p in range(options.nproc):
proc_key = 'proc{0:03d}'.format(p)
proc_pool[proc_key] = _get_proc(directories, exclude, src, dst, options)
while any([v[0] is not None for v in proc_pool.values()]):
for proc_key in proc_pool:
proc, LOG, d = proc_pool[proc_key]
if proc is None:
status = None
else:
if options.test:
log.debug("%s: %s -> %s", d, ' '.join(proc), LOG)
status = 0
else:
status = proc.poll()
if status is not None:
if not options.test:
LOG.close()
if status != 0:
log.critical("rsync error detected for %s/%s/! Check logs!", dst, d)
proc_pool[proc_key] = _get_proc(directories, exclude, src, dst, options)
if not options.test:
log.debug("Waiting for jobs to complete, sleeping %s.", options.sleep)
time.sleep(sleepy_time)
return 0