Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DIRAC management #103

Merged
merged 3 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions src/nectarchain/data/management.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
# The DIRAC magic 2 lines !
try:
import DIRAC

DIRAC.initialize()
except ImportError:
pass

import glob
import logging
import os
Expand All @@ -24,6 +16,18 @@

__all__ = ["DataManagement"]

# The DIRAC magic 2 lines !
try:
import DIRAC

DIRAC.initialize()

Check warning on line 23 in src/nectarchain/data/management.py

View check run for this annotation

Codecov / codecov/patch

src/nectarchain/data/management.py#L23

Added line #L23 was not covered by tests
except ImportError:
log.warning("DIRAC probably not installed")
pass
except Exception as e:
log.warning(f"DIRAC could not be properly initialized: {e}")
pass

Check warning on line 29 in src/nectarchain/data/management.py

View check run for this annotation

Codecov / codecov/patch

src/nectarchain/data/management.py#L27-L29

Added lines #L27 - L29 were not covered by tests


class DataManagement:
@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

function usage ()
{
echo "Usage: `basename $0` -r <run number>"
echo "Usage: $(basename $0) -r <run number>"
}

function help ()
Expand Down Expand Up @@ -55,6 +55,7 @@ function exit_script() {
# Some cleanup before leaving:
# [ -d $CONTAINER ] && rm -rf $CONTAINER
# [ -f $CONTAINER ] && rm -f $CONTAINER
[ -d $NECTARCAMDATA ] && rm -rf $NECTARCAMDATA
[ -d $OUTDIR ] && rm -rf $OUTDIR
[ -f ${OUTDIR}.tar.gz ] && rm -f ${OUTDIR}.tar.gz
[ -d ${OUTDIR} ] && rm -rf ${OUTDIR}
Expand All @@ -63,22 +64,29 @@ function exit_script() {
exit $return_code
}

export NECTARCAMDATA=$PWD/runs
[ ! -d $NECTARCAMDATA ] && mkdir -p $NECTARCAMDATA || exit_script $?
mv nectarcam*.sqlite NectarCAM.Run*.fits.fz $NECTARCAMDATA/.

# Halim's DQM code needs to use a specific output directory:
export NECTARDIR=$PWD/$OUTDIR
[ ! -d $NECTARDIR ] && mkdir -p $NECTARDIR || exit_script $?
# mv nectarcam*.sqlite NectarCAM.Run*.fits.fz $NECTARDIR/.

LISTRUNS=""
for run in $PWD/NectarCAM.Run${runnb}.*.fits.fz; do
LISTRUNS="$LISTRUNS $(basename $run)"
done
#LISTRUNS=""
#for run in $NECTARCAMDATA/NectarCAM.Run${runnb}.*.fits.fz; do
# LISTRUNS="$LISTRUNS $(basename $run)"
#done

# Create a wrapper BASH script with cleaned environment, see https://redmine.cta-observatory.org/issues/51483
cat > $WRAPPER <<EOF
#!/bin/env bash
echo "Cleaning environment \$CLEANED_ENV"
[ -z "\$CLEANED_ENV" ] && exec /bin/env -i CLEANED_ENV="Done" HOME=\${HOME} SHELL=/bin/bash /bin/bash -l "\$0" "\$@"

# From https://github.com/DIRACGrid/COMDIRAC/wiki/Injob
# initialize job for COMDIRAC commands
export DCOMMANDS_CONFIG_DIR=$PWD
dconfig --guess
dinit --fromProxy

# Some environment variables related to python, to be passed to container, be it for old Singularity version or recent Apptainer ones:
export SINGULARITYENV_MPLCONFIGDIR=/tmp
Expand All @@ -102,7 +110,8 @@ fi
echo
echo "Running"
# Instantiate the nectarchain Singularity image, run our DQM example run within it:
cmd="\$CALLER exec --home $PWD $CONTAINER /opt/conda/envs/nectarchain/bin/python /opt/cta/nectarchain/src/nectarchain/dqm/start_calib.py $PWD $NECTARDIR -i $LISTRUNS"
# cmd="\$CALLER exec --home $PWD $CONTAINER /opt/conda/envs/nectarchain/bin/python /opt/cta/nectarchain/src/nectarchain/dqm/start_dqm.py --r0 $NECTARCAMDATA $NECTARDIR -i $LISTRUNS"
cmd="\$CALLER exec --home $PWD $CONTAINER /opt/conda/envs/nectarchain/bin/python /opt/cta/nectarchain/src/nectarchain/dqm/start_dqm.py --r0 -r $runnb $NECTARCAMDATA $NECTARDIR"
echo \$cmd
eval \$cmd
EOF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,72 @@
# Time-stamp: "2023-05-30 13:09:04 jlenain"

import argparse
import sys
import logging
import sys
from time import sleep

# The magic DIRAC 2 lines
import DIRAC

# astropy imports
from astropy import time
from astropy import units as u

DIRAC.initialize()

# DIRAC imports
from DIRAC.Interfaces.API.Dirac import Dirac
from DIRAC.Interfaces.API.Job import Job
from DIRAC.Resources.Catalog.FileCatalogClient import FileCatalogClient

logging.basicConfig(format='[%(levelname)s] %(message)s')
logging.basicConfig(format="[%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)

dirac = Dirac()

# Option and argument parser
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--date',
default=None,
help='date for which NectarCAM runs should be processed',
type=str)
parser.add_argument('-r', '--run',
default=None,
help='only process a specific run (optional)',
type=str)
parser.add_argument('--dry-run',
action='store_true',
default=False,
help='dry run (does not actually submit jobs)')
parser.add_argument('--log',
default='info',
help='debug output',
type=str)
parser.add_argument(
"-d",
"--date",
default=None,
help="date for which NectarCAM runs should be processed",
type=str,
)
parser.add_argument(
"-r", "--run", default=None, help="only process a specific run (optional)", type=str
)
parser.add_argument(
"--dry-run",
action="store_true",
default=False,
help="dry run (does not actually submit jobs)",
)
parser.add_argument("--log", default="info", help="debug output", type=str)
args = parser.parse_args()

logger.setLevel(args.log.upper())

if args.date is None:
logger.critical('A date should be provided, in a format astropy.time.Time compliant. E.g. "2022-04-01".')
logger.critical(
'A date should be provided, in a format astropy.time.Time compliant. E.g. "2022-04-01".'
)
sys.exit(1)

executable_wrapper="dqm_processor.sh"
executable_wrapper = "dqm_processor.sh"

## Possible massive job processing via loop on run numbers:
# for run in ['2720', '3277', '...']:

## or from DIRAC FileCatalog directory listing:
processDate = time.Time(args.date)
dfcDir = f'/vo.cta.in2p3.fr/nectarcam/{processDate.ymdhms[0]}/{processDate.ymdhms[0]}{str(processDate.ymdhms[1]).zfill(2)}{str(processDate.ymdhms[2]).zfill(2)}'
dfcDir = f"/vo.cta.in2p3.fr/nectarcam/{processDate.ymdhms[0]}/{processDate.ymdhms[0]}{str(processDate.ymdhms[1]).zfill(2)}{str(processDate.ymdhms[2]).zfill(2)}"

# The relevant DB file may be stored in the directory corresponding to the day after:
processDateTomorrow = processDate + 1. * u.day
dfcDirTomorrow = f'/vo.cta.in2p3.fr/nectarcam/{processDateTomorrow.ymdhms[0]}/{processDateTomorrow.ymdhms[0]}{str(processDateTomorrow.ymdhms[1]).zfill(2)}{str(processDateTomorrow.ymdhms[2]).zfill(2)}'
processDateTomorrow = processDate + 1.0 * u.day
dfcDirTomorrow = f"/vo.cta.in2p3.fr/nectarcam/{processDateTomorrow.ymdhms[0]}/{processDateTomorrow.ymdhms[0]}{str(processDateTomorrow.ymdhms[1]).zfill(2)}{str(processDateTomorrow.ymdhms[2]).zfill(2)}"

# Sometimes, for unkown reason, the connection to the DFC can fail, try a few times:
# Sometimes, for unknown reason, the connection to the DFC can fail, try a few times:
sleep_time = 2
num_retries = 3
for x in range(0, num_retries):
Expand All @@ -76,77 +84,91 @@
else:
break
if not dfc:
logger.fatal(f'Connection to FileCatalogClient failed, aborting...')
logger.fatal(f"Connection to FileCatalogClient failed, aborting...")
sys.exit(1)

infos = dfc.listDirectory(dfcDir)
infosTomorrow = dfc.listDirectory(dfcDirTomorrow)
if not infos['OK'] or not infos['Value']['Successful']:
logger.critical(f"Could not properly retrieve the file metadata for {dfcDir} ... Exiting !")
if not infos["OK"] or not infos["Value"]["Successful"]:
logger.critical(
f"Could not properly retrieve the file metadata for {dfcDir} ... Exiting !"
)
sys.exit(1)
if not infosTomorrow['OK'] or not infosTomorrow['Value']['Successful']:
logger.warning(f"Could not properly retrieve the file metadata for {dfcDirTomorrow} ... Continuing !")
meta = infos['Value']['Successful'][dfcDir]
if not infosTomorrow["OK"] or not infosTomorrow["Value"]["Successful"]:
logger.warning(
f"Could not properly retrieve the file metadata for {dfcDirTomorrow} ... Continuing !"
)
meta = infos["Value"]["Successful"][dfcDir]
try:
metaTomorrow = infosTomorrow['Value']['Successful'][dfcDirTomorrow]
metaTomorrow = infosTomorrow["Value"]["Successful"][dfcDirTomorrow]
except KeyError:
metaTomorrow = None

runlist = []

sqlfilelist = []
for f in meta['Files']:
if f.endswith('.fits.fz'):
run = f.split('NectarCAM.Run')[1].split('.')[0]
for f in meta["Files"]:
if f.endswith(".fits.fz"):
run = f.split("NectarCAM.Run")[1].split(".")[0]
if run not in runlist and run is not None:
runlist.append(run)
if f.endswith('.sqlite'):
if f.endswith(".sqlite"):
sqlfilelist.append(f)
if metaTomorrow:
for f in metaTomorrow['Files']:
if f.endswith('.sqlite'):
for f in metaTomorrow["Files"]:
if f.endswith(".sqlite"):
sqlfilelist.append(f)
if args.run is not None:
if args.run not in runlist:
logger.critical(f'Your specified run {args.run} was not found in {dfcDir}, aborting...')
logger.critical(
f"Your specified run {args.run} was not found in {dfcDir}, aborting..."
)
sys.exit(1)
runlist = [args.run]
logger.info(f'Found runs {runlist} in {dfcDir}')

logger.info(f"Found runs {runlist} in {dfcDir}")

if len(sqlfilelist) == 0:
logger.critical('Could not find any SQLite file in {dfcDir} nor in {dfcDirTomorrow}, aborting...')
logger.critical(
"Could not find any SQLite file in {dfcDir} nor in {dfcDirTomorrow}, aborting..."
)
sys.exit(1)
logger.info(f'Found SQLite files {sqlfilelist} in {dfcDir} and {dfcDirTomorrow}')
logger.info(f"Found SQLite files {sqlfilelist} in {dfcDir} and {dfcDirTomorrow}")

# Now, submit the DIRAC jobs:
# for run in ['2721']:
for run in runlist:
j = Job()
# j.setExecutable(f'{executable_wrapper}', '<SOME POSSIBLE ARGUMENTS such as run number>')
j.setExecutable(f'{executable_wrapper}', f'-r {run}')
j.setExecutable(f"{executable_wrapper}", f"-r {run}")
# Force job to be run from a given Computing Element:
# j.setDestination('LCG.GRIF.fr')
j.setName(f'NectarCAM DQM run {run}')
j.setJobGroup('NectarCAM DQM')
sandboxlist = [f'{executable_wrapper}']
for f in meta['Files']:
if f.endswith('.fits.fz') and f'NectarCAM.Run{run}' in f:
sandboxlist.append(f'LFN:{f}')
j.setName(f"NectarCAM DQM run {run}")
j.setJobGroup("NectarCAM DQM")
sandboxlist = [f"{executable_wrapper}"]
for f in meta["Files"]:
if f.endswith(".fits.fz") and f"NectarCAM.Run{run}" in f:
sandboxlist.append(f"LFN:{f}")
for s in sqlfilelist:
sandboxlist.append(f'LFN:{s}')
sandboxlist.append(f"LFN:{s}")
if len(sandboxlist) < 2:
logger.critical(f'''Misformed sandboxlist, actual data .fits.fz files missing:
logger.critical(
f"""Misformed sandboxlist, actual data .fits.fz files missing:
{sandboxlist}

Aborting...
''')
"""
)
sys.exit(1)
logger.info(f'''Submitting job for run {run}, with the following InputSandbox:
logger.info(
f"""Submitting job for run {run}, with the following InputSandbox:
{sandboxlist}
''')
"""
)
j.setInputSandbox(sandboxlist)

if not args.dry_run:
res = dirac.submitJob(j) # , mode='local') # for local execution, simulating a DIRAC job on the local machine, instead of submitting it to a DIRAC Computing Element
res = dirac.submitJob(
j
) # , mode='local') # for local execution, simulating a DIRAC job on the local machine, instead of submitting it to a DIRAC Computing Element
logger.info(f"Submission Result: {res['Value']}")
Loading