-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathsandbox.py
285 lines (227 loc) · 9.89 KB
/
sandbox.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# coding: utf-8
"""
Singularity sandbox implementation.
"""
__all__ = ["SingularitySandbox"]
import os
import subprocess
import luigi
import six
from law.config import Config
from law.sandbox.base import Sandbox
from law.target.local import LocalDirectoryTarget, LocalFileTarget
from law.cli.software import get_software_deps
from law.util import make_list, interruptable_popen, quote_cmd, flatten, law_src_path, makedirs
class SingularitySandbox(Sandbox):
sandbox_type = "singularity"
config_section_prefix = sandbox_type
@property
def image(self):
return self.name
@property
def env_cache_key(self):
return self.image
def get_custom_config_section_postfix(self):
return self.image
def create_env(self):
# strategy: unlike docker, singularity might not allow binding of paths that do not exist
# in the container, so create a tmp directory on the host system and bind it as /tmp, let
# python dump its full env into a file, and read the file again on the host system
# helper to load the env
def load_env(target):
try:
return tmp.load(formatter="pickle")
except Exception as e:
raise Exception(
"env deserialization of sandbox {} failed: {}".format(self, e),
)
# load the env when the cache file is configured and existing
if self.env_cache_path:
env_cache_target = LocalFileTarget(self.env_cache_path)
if env_cache_target.exists():
return load_env(env_cache_target)
# create tmp dir and file
tmp_dir = LocalDirectoryTarget(is_tmp=True)
tmp_dir.touch()
tmp = tmp_dir.child("env", type="f")
tmp.touch()
# determine whether volume binding is allowed
allow_binds_cb = getattr(self.task, "singularity_allow_binds", None)
if callable(allow_binds_cb):
allow_binds = allow_binds_cb()
else:
cfg = Config.instance()
allow_binds = cfg.get_expanded(self.get_config_section(), "allow_binds")
# arguments to configure the environment
args = ["-e"]
if allow_binds:
args.extend(["-B", "{}:/tmp".format(tmp_dir.path)])
env_file = "/tmp/{}".format(tmp.basename)
else:
env_file = tmp.path
# get the singularity exec command
singularity_exec_cmd = self._singularity_exec_cmd() + args
# pre-setup commands
pre_setup_cmds = self._build_pre_setup_cmds()
# post-setup commands
post_env = self._get_env()
post_setup_cmds = self._build_post_setup_cmds(post_env)
# build the python command that dumps the environment
py_cmd = "import os,pickle;" \
+ "pickle.dump(dict(os.environ),open('{}','wb'),protocol=2)".format(env_file)
# build the full command
cmd = quote_cmd(singularity_exec_cmd + [self.image, "bash", "-l", "-c",
" && ".join(flatten(
pre_setup_cmds,
post_setup_cmds,
quote_cmd(["python", "-c", py_cmd]),
)),
])
# run it
code, out, _ = interruptable_popen(cmd, shell=True, executable="/bin/bash",
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if code != 0:
raise Exception(
"singularity sandbox env loading failed with exit code {}:\n{}".format(
code, out),
)
# copy to the cache path when configured
if self.env_cache_path:
tmp.copy_to_local(env_cache_target)
# load the env
env = load_env(tmp)
return env
def _singularity_exec_cmd(self):
cmd = ["singularity", "exec"]
# task-specific argiments
if self.task:
# add args configured on the task
args_getter = getattr(self.task, "singularity_args", None)
if callable(args_getter):
cmd.extend(make_list(args_getter()))
return cmd
def cmd(self, proxy_cmd):
# singularity exec command arguments
# -e clears the environment
args = ["-e"]
# helper to build forwarded paths
cfg = Config.instance()
cfg_section = self.get_config_section()
forward_dir = cfg.get_expanded(cfg_section, "forward_dir")
python_dir = cfg.get_expanded(cfg_section, "python_dir")
bin_dir = cfg.get_expanded(cfg_section, "bin_dir")
stagein_dir_name = cfg.get_expanded(cfg_section, "stagein_dir_name")
stageout_dir_name = cfg.get_expanded(cfg_section, "stageout_dir_name")
def dst(*args):
return os.path.join(forward_dir, *(str(arg) for arg in args))
# helper for mounting a volume
volume_srcs = []
def mount(*vol):
src = vol[0]
# make sure, the same source directory is not mounted twice
if src in volume_srcs:
return
volume_srcs.append(src)
# ensure that source directories exist
if not os.path.isfile(src):
makedirs(src)
# store the mount point
args.extend(["-B", ":".join(vol)])
# determine whether volume binding is allowed
allow_binds_cb = getattr(self.task, "singularity_allow_binds", None)
if callable(allow_binds_cb):
allow_binds = allow_binds_cb()
else:
allow_binds = cfg.get_expanded(cfg_section, "allow_binds")
# determine whether law software forwarding is allowed
forward_law_cb = getattr(self.task, "singularity_forward_law", None)
if callable(forward_law_cb):
forward_law = forward_law_cb()
else:
forward_law = cfg.get_expanded_bool(cfg_section, "forward_law")
# environment variables to set
env = self._get_env()
# prevent python from writing byte code files
env["PYTHONDONTWRITEBYTECODE"] = "1"
if forward_law:
# adjust path variables
if allow_binds:
env["PATH"] = os.pathsep.join([dst("bin"), "$PATH"])
env["PYTHONPATH"] = os.pathsep.join([dst(python_dir), "$PYTHONPATH"])
else:
env["PATH"] = "$PATH"
env["PYTHONPATH"] = "$PYTHONPATH"
# forward python directories of law and dependencies
for mod in get_software_deps():
path = os.path.dirname(mod.__file__)
name, ext = os.path.splitext(os.path.basename(mod.__file__))
if name == "__init__":
vsrc = path
vdst = dst(python_dir, os.path.basename(path))
else:
vsrc = os.path.join(path, name + ".py")
vdst = dst(python_dir, name + ".py")
if allow_binds:
mount(vsrc, vdst)
else:
dep_path = os.path.dirname(vsrc)
if dep_path not in env["PYTHONPATH"].split(os.pathsep):
env["PYTHONPATH"] = os.pathsep.join([dep_path, env["PYTHONPATH"]])
# forward the law cli dir to bin as it contains a law executable
if allow_binds:
env["PATH"] = os.pathsep.join([dst(python_dir, "law", "cli"), env["PATH"]])
else:
env["PATH"] = os.pathsep.join([law_src_path("cli"), env["PATH"]])
# forward the law config file
if cfg.config_file:
if allow_binds:
mount(cfg.config_file, dst("law.cfg"))
env["LAW_CONFIG_FILE"] = dst("law.cfg")
else:
env["LAW_CONFIG_FILE"] = cfg.config_file
# forward the luigi config file
for p in luigi.configuration.LuigiConfigParser._config_paths[::-1]:
if os.path.exists(p):
if allow_binds:
mount(p, dst("luigi.cfg"))
env["LUIGI_CONFIG_PATH"] = dst("luigi.cfg")
else:
env["LUIGI_CONFIG_PATH"] = p
break
# add staging directories
if (self.stagein_info or self.stageout_info) and not allow_binds:
raise Exception("cannot use stage-in or -out if binds are not allowed")
if self.stagein_info:
env["LAW_SANDBOX_STAGEIN_DIR"] = dst(stagein_dir_name)
mount(self.stagein_info.stage_dir.path, dst(stagein_dir_name))
if self.stageout_info:
env["LAW_SANDBOX_STAGEOUT_DIR"] = dst(stageout_dir_name)
mount(self.stageout_info.stage_dir.path, dst(stageout_dir_name))
# forward volumes defined in the config and by the task
vols = self._get_volumes()
if vols and not allow_binds:
raise Exception("cannot forward volumes to sandbox if binds are not allowed")
for hdir, cdir in six.iteritems(vols):
if not cdir:
mount(hdir)
else:
cdir = self._expand_volume(cdir, bin_dir=dst(bin_dir), python_dir=dst(python_dir))
mount(hdir, cdir)
# handle local scheduling within the container
if self.force_local_scheduler():
proxy_cmd.add_arg("--local-scheduler", "True", overwrite=True)
# get the singularity exec command, add arguments from above
singularity_exec_cmd = self._singularity_exec_cmd() + args
# pre-setup commands
pre_setup_cmds = self._build_pre_setup_cmds()
# post-setup commands with the full env
post_setup_cmds = self._build_post_setup_cmds(env)
# build the final command
cmd = quote_cmd(singularity_exec_cmd + [self.image, "bash", "-l", "-c",
" && ".join(flatten(
pre_setup_cmds,
post_setup_cmds,
proxy_cmd.build(),
)),
])
return cmd