Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: initial support for ftp locations #25

Merged
merged 51 commits into from
Oct 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
32c800f
initial support for ftp locations
mr-c Aug 17, 2018
9767b5b
better fallthrough support
mr-c Aug 17, 2018
4bfe7fd
fix netrc detection
mr-c Aug 17, 2018
d592bab
better isdir
mr-c Aug 17, 2018
bdc8723
proper logging of cache hit
mr-c Aug 17, 2018
cb4a8bf
calm down conformance test
mr-c Aug 17, 2018
9f44ade
upload and rewrite the input object
mr-c Aug 23, 2018
a010ac9
reuse ftp connection
mr-c Aug 23, 2018
2213d50
custom PathMapper
mr-c Aug 23, 2018
ad5d4cd
set URL for outputs using the remote storage url
mr-c Aug 23, 2018
f23adff
add netrc support for open
mr-c Aug 23, 2018
62f93d5
always try to create target directory
mr-c Aug 23, 2018
eff3d41
per run folder, segregate inputs
mr-c Aug 24, 2018
176ac40
use a regular PathMapper if not in remote storage mode
mr-c Aug 24, 2018
d8ba75d
Don't test PRs twice.w
mr-c Aug 24, 2018
77bcfef
use working docker image
mr-c Sep 6, 2018
81cabc4
fix glob logic
mr-c Sep 6, 2018
ce1b75e
bump cwltool version
mr-c Sep 6, 2018
2ae8291
open: add context manager for python2
mr-c Sep 7, 2018
febba4b
add per output directories for remote storage
mr-c Sep 10, 2018
1edced5
make per-CLT folder into per-job folder
mr-c Sep 10, 2018
eeafc3b
grab checksum fix
mr-c Sep 12, 2018
33ca370
use the correct HOME and TMPDIR as per the CWL spec
mr-c Sep 12, 2018
4885d6a
adapt the arvados uploader for cwl-tes
mr-c Sep 13, 2018
8d09fda
properly fail jobs
mr-c Sep 13, 2018
510c405
misc
mr-c Sep 13, 2018
710fbf5
don't double set process_status
mr-c Sep 13, 2018
f91591e
a typo and a fix
mr-c Sep 14, 2018
274baf2
match behaviour of os.path.join
mr-c Sep 14, 2018
877e00c
listdir was just wrong
mr-c Sep 14, 2018
48cb281
ather and evaluate exit_code
mr-c Sep 14, 2018
ceb9bcf
fix exit_code parsing
mr-c Sep 14, 2018
c31a4dd
fix small bug
mr-c Sep 18, 2018
471444c
fix one part, fix the other
mr-c Sep 18, 2018
ab58d18
improved error handling
mr-c Sep 18, 2018
050de41
fix InitialWorkDirRequirement with non-local locations
mr-c Sep 20, 2018
38475de
make abspath more futureproof
mr-c Sep 20, 2018
132fa24
harden exit code checking
mr-c Sep 20, 2018
543461e
fix FTP/remote globbing with "*"
mr-c Sep 21, 2018
5fd5fd3
properly destage files from FTP
mr-c Sep 21, 2018
9a53524
remove unneeded lines
mr-c Sep 21, 2018
0bb1086
grab complimentary cwltool fixes
mr-c Sep 21, 2018
917c338
fix py2 regression
mr-c Sep 24, 2018
62577e5
support dynamic resource requirements
mr-c Sep 25, 2018
5e0ea74
simplify dockerOutputDirectory handling
mr-c Sep 25, 2018
73001fb
another simplification
mr-c Sep 25, 2018
328be3b
skip test with Directory literal input for now
mr-c Sep 25, 2018
5b916e1
typo
mr-c Sep 25, 2018
6d48f96
add Directory upload
mr-c Sep 26, 2018
adbf818
re-fix remote cwl.output.json
mr-c Sep 27, 2018
acf2911
unlimit the number of cores/memory
mr-c Sep 28, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@ script:
after_failure:
- echo "FUNNEL LOGS ----------"
- cat ./tests/test_tmp/conformance_test_v1.0_*/funnel_log.txt
branches:
only:
- master
notifications:
email: false
231 changes: 231 additions & 0 deletions cwl_tes/ftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
"""FTP support"""
from __future__ import absolute_import

import contextlib
import fnmatch
import ftplib
import logging
import netrc
import glob
import os
from typing import List, Text # noqa F401 # pylint: disable=unused-import

from six import PY2
from six.moves import urllib
from schema_salad.ref_resolver import uri_file_path

from cwltool.stdfsaccess import StdFsAccess
from cwltool.loghandler import _logger


def abspath(src, basedir): # type: (Text, Text) -> Text
"""http(s):, file:, ftp:, and plain path aware absolute path"""
scheme = urllib.parse.urlparse(src).scheme
if scheme == u"file":
apath = Text(uri_file_path(str(src)))
elif scheme:
return src
else:
if basedir.startswith(u"file://"):
apath = src if os.path.isabs(src) else basedir + '/' + src
else:
apath = src if os.path.isabs(src) else os.path.join(basedir, src)
return apath


class FtpFsAccess(StdFsAccess):
"""FTP access with upload."""
def __init__(self, basedir, cache=None): # type: (Text) -> None
super(FtpFsAccess, self).__init__(basedir)
self.cache = cache or {}
self.netrc = None
try:
if 'HOME' in os.environ:
if os.path.exists(os.path.join(os.environ['HOME'], '.netrc')):
self.netrc = netrc.netrc(
os.path.join(os.environ['HOME'], '.netrc'))
elif os.path.exists(os.path.join(os.curdir, '.netrc')):
self.netrc = netrc.netrc(os.path.join(os.curdir, '.netrc'))
except netrc.NetrcParseError as err:
_logger.debug(err)

def _parse_url(self, url):
# type: (Text) -> Tuple[Optional[Text], Optional[Text]]
parse = urllib.parse.urlparse(url)
user = parse.username
passwd = parse.password
host = parse.netloc
path = parse.path
if parse.scheme == 'ftp':
if not user and self.netrc:
creds = self.netrc.authenticators(host)
if creds:
user, _, passwd = creds
if not user:
user = "anonymous"
passwd = "anonymous@"
return host, user, passwd, path

def _connect(self, url): # type: (Text) -> Optional[ftplib.FTP]
parse = urllib.parse.urlparse(url)
if parse.scheme == 'ftp':
host, user, passwd, _ = self._parse_url(url)
if (host, user, passwd) in self.cache:
if self.cache[(host, user, passwd)].pwd():
return self.cache[(host, user, passwd)]
ftp = ftplib.FTP_TLS()
ftp.set_debuglevel(1 if _logger.isEnabledFor(logging.DEBUG) else 0)
ftp.connect(host)
ftp.login(user, passwd)
self.cache[(host, user, passwd)] = ftp
return ftp
return None

def _abs(self, p): # type: (Text) -> Text
return abspath(p, self.basedir)

def glob(self, pattern): # type: (Text) -> List[Text]
if not self.basedir.startswith("ftp:"):
return super(FtpFsAccess, self).glob(pattern)
return self._glob(pattern)

def _glob0(self, basename, basepath):
if basename == '':
if self.isdir(basepath):
return [basename]
else:
if self.isfile(self.join(basepath, basename)):
return [basename]
return []

def _glob1(self, pattern, basepath=None):
try:
names = self.listdir(basepath)
except ftplib.all_errors:
return []
if pattern[0] != '.':
names = filter(lambda x: x[0] != '.', names)
return fnmatch.filter(names, pattern)

def _glob(self, pattern): # type: (Text) -> List[Text]
if pattern.endswith("/."):
pattern = pattern[:-1]
dirname, basename = pattern.rsplit('/', 1)
if not glob.has_magic(pattern):
if basename:
if self.exists(pattern):
return [pattern]
else: # Patterns ending in slash should match only directories
if self.isdir(dirname):
return [pattern]
return []
if not dirname:
return self._glob1(basename)

dirs = self._glob(dirname)
if glob.has_magic(basename):
glob_in_dir = self._glob1
else:
glob_in_dir = self._glob0
results = []
for dirname in dirs:
results.extend(glob_in_dir(basename, dirname))
return results

def open(self, fn, mode):
if not fn.startswith("ftp:"):
return super(FtpFsAccess, self).open(fn, mode)
if 'r' in mode:
host, user, passwd, path = self._parse_url(fn)
handle = urllib.request.urlopen(
"ftp://{}:{}@{}/{}".format(user, passwd, host, path))
if PY2:
return contextlib.closing(handle)
return handle
raise Exception('Write mode FTP not implemented')

def exists(self, fn): # type: (Text) -> bool
if not self.basedir.startswith("ftp:"):
return super(FtpFsAccess, self).exists(fn)
return self.isfile(fn) or self.isdir(fn)

def isfile(self, fn): # type: (Text) -> bool
ftp = self._connect(fn)
if ftp:
try:
self.size(fn)
return True
except ftplib.all_errors:
return False
return super(FtpFsAccess, self).isfile(fn)

def isdir(self, fn): # type: (Text) -> bool
ftp = self._connect(fn)
if ftp:
try:
cwd = ftp.pwd()
ftp.cwd(urllib.parse.urlparse(fn).path)
ftp.cwd(cwd)
return True
except ftplib.all_errors:
return False
return super(FtpFsAccess, self).isdir(fn)

def mkdir(self, url, recursive=True):
"""Make the directory specified in the URL."""
ftp = self._connect(url)
path = urllib.parse.urlparse(url).path
if not recursive:
return ftp.mkd(path)
dirs = [d for d in path.split('/') if d != '']
for index, _ in enumerate(dirs):
try:
ftp.mkd("/".join(dirs[:index+1])+'/')
except ftplib.all_errors:
pass
return None

def listdir(self, fn): # type: (Text) -> List[Text]
ftp = self._connect(fn)
if ftp:
host, _, _, path = self._parse_url(fn)
return ["ftp://{}/{}".format(host, x) for x in ftp.nlst(path)]
return super(FtpFsAccess, self).listdir(fn)

def join(self, path, *paths): # type: (Text, *Text) -> Text
if path.startswith('ftp:'):
result = path
for extra_path in paths:
if extra_path.startswith('ftp:/'):
result = extra_path
else:
result = result + "/" + extra_path
return result
return super(FtpFsAccess, self).join(path, *paths)

def realpath(self, path): # type: (Text) -> Text
if path.startswith('ftp:'):
return path
return os.path.realpath(path)

def size(self, fn):
ftp = self._connect(fn)
if ftp:
host, user, passwd, path = self._parse_url(fn)
try:
return ftp.size(path)
except ftplib.all_errors:
handle = urllib.request.urlopen(
"ftp://{}:{}@{}/{}".format(user, passwd, host, path))
info = handle.info()
handle.close()
if 'Content-length' in info:
return int(info['Content-length'])
return None

return super(FtpFsAccess, self).size(fn)

def upload(self, file_handle, url):
"""FtpFsAccess specific method to upload a file to the given URL."""
ftp = self._connect(url)
ftp.storbinary("STOR {}".format(self._parse_url(url)[3]), file_handle)
Loading