From 4801fecc3b3a1b8d8942e44dbb4ad2b7b3ac360b Mon Sep 17 00:00:00 2001 From: Andrea Pierleoni Date: Sun, 15 Oct 2017 14:39:44 +0100 Subject: [PATCH] Support for the Docker Python SDK (#2158) * added stub for minimal docker wrapper * don't set a name by default * handle None name * changed to use the docker low level api * removed debugging print * refractored names * added a test for single file mount * merged with tempdir support and image download * directory and file mounting working * fixed tests and using busybox when needed * always use busybox * fallback if exception has no message * lowered pull image to debug level * do not declare volumes unless they are passed as extra params * remove name requirement from tmpdir * tag must be specified to get the right number of images back from the low-level API * test to trigger duplicate mount pt issue * fix pid lock in tests * use a wrapper task in test * created an _init_ method to avoid the mutable volumes list to be shared amongst all tasks. also some linting * need args on the init to allow parameters * solves pid lock * remove bad logging lines * fix handling of multiple volumes * must be passing kwargs when using super! * add elipapa as additional author * add a blog post about open targets use and this PR contribution * skip tests if no docker daemon is present * literal should become a bytes literal in Py3 * do not mount tmp dir by default and remove unused import * don't remove tmp directory if it was not created * volume and binds should both be there * use host_config's auto_Remove functionality * docker API v>1.25 required for auto_remove * extended module docs with contribution use case * added version to docker dependency * mount tmp dir by default, and rename volumes to binds * enable docker service in travis ci to allow DockerTask tests to run * fix boto problem in tests --- .travis.yml | 1 + README.rst | 1 + luigi/contrib/docker_runner.py | 245 +++++++++++++++++++++++++++++ test/contrib/docker_runner_test.py | 162 +++++++++++++++++++ tox.ini | 2 + 5 files changed, 411 insertions(+) create mode 100644 luigi/contrib/docker_runner.py create mode 100644 test/contrib/docker_runner_test.py diff --git a/.travis.yml b/.travis.yml index 5c9fcb03e6..300043610c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ language: python services: - elasticsearch - mysql + - docker env: global: diff --git a/README.rst b/README.rst index 60646e36e5..d8ab25dd6d 100644 --- a/README.rst +++ b/README.rst @@ -145,6 +145,7 @@ or held presentations about Luigi: * `Red Hat - Marketing Operations `_ `(blog, 2017) `__ * `GetNinjas `_ `(blog, 2017) `__ * `voyages-sncf.com `_ `(presentation, 2017) `__ +* `Open Targets `_ `(blog, 2017) `__ Some more companies are using Luigi but haven't had a chance yet to write about it: diff --git a/luigi/contrib/docker_runner.py b/luigi/contrib/docker_runner.py new file mode 100644 index 0000000000..fc2af76a08 --- /dev/null +++ b/luigi/contrib/docker_runner.py @@ -0,0 +1,245 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2017 Open Targets +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +""" +Docker container wrapper for Luigi. + +Enables running a docker container as a task in luigi. +This wrapper uses the Docker Python SDK to communicate directly with the +Docker API avoiding the common pattern to invoke the docker client +from the command line. Using the SDK it is possible to detect and properly +handle errors occurring when pulling, starting or running the containers. +On top of this, it is possible to mount a single file in the container +and a temporary directory is created on the host and mounted allowing +the handling of files bigger than the container limit. + +Requires: + +- docker: ``pip install docker`` + +Written and maintained by Andrea Pierleoni (@apierleoni). +Contributions by Eliseo Papa (@elipapa). +""" +from tempfile import mkdtemp +import logging +import luigi + +from luigi.local_target import LocalFileSystem +from luigi import six + +logger = logging.getLogger('luigi-interface') + +try: + import docker + from docker.errors import ContainerError, ImageNotFound, APIError + +except ImportError: + logger.warning('docker is not installed. DockerTask requires docker.') + docker = None + +# TODO: may need to implement this logic for remote hosts +# class dockerconfig(luigi.Config): +# ''' +# this class allows to use the luigi.cfg file to specify the path to the docker config.json. +# The docker client should look by default in the main directory, +# but on different systems this may need to be specified. +# ''' +# docker_config_path = luigi.Parameter( +# default="~/.docker/config.json", +# description="Path to dockercfg file for authentication") + + +class DockerTask(luigi.Task): + + @property + def image(self): + return 'alpine' + + @property + def command(self): + return "echo hello world" + + @property + def name(self): + return None + + @property + def container_options(self): + return {} + + @property + def environment(self): + return {} + + @property + def container_tmp_dir(self): + return '/tmp/luigi' + + @property + def binds(self): + ''' + Override this to mount local volumes, in addition to the /tmp/luigi + which gets defined by default. This should return a list of strings. + e.g. ['/hostpath1:/containerpath1', '/hostpath2:/containerpath2'] + ''' + return None + + @property + def network_mode(self): + return '' + + @property + def docker_url(self): + return None + + @property + def auto_remove(self): + return True + + @property + def force_pull(self): + return False + + @property + def mount_tmp(self): + return True + + def __init__(self, *args, **kwargs): + ''' + When a new instance of the DockerTask class gets created: + - call the parent class __init__ method + - start the logger + - init an instance of the docker client + - create a tmp dir + - add the temp dir to the volume binds specified in the task + ''' + super(DockerTask, self).__init__(*args, **kwargs) + self.__logger = logger + + '''init docker client + using the low level API as the higher level API does not allow to mount single + files as volumes + ''' + self._client = docker.APIClient(self.docker_url) + + # add latest tag if nothing else is specified by task + if ':' not in self.image: + self._image = ':'.join([self.image, 'latest']) + else: + self._image = self.image + + if self.mount_tmp: + # create a tmp_dir, NOTE: /tmp needs to be specified for it to work on + # macOS, despite what the python documentation says + self._host_tmp_dir = mkdtemp(suffix=self.task_id, + prefix='luigi-docker-tmp-dir-', + dir='/tmp') + + self._binds = ['{0}:{1}'.format(self._host_tmp_dir, self.container_tmp_dir)] + else: + self._binds = [] + + # update environment property with the (internal) location of tmp_dir + self.environment['LUIGI_TMP_DIR'] = self.container_tmp_dir + + # add additional volume binds specified by the user to the tmp_Dir bind + if isinstance(self.binds, six.string_types): + self._binds.append(self.binds) + elif isinstance(self.binds, list): + self._binds.extend(self.binds) + + # derive volumes (ie. list of container destination paths) from + # specified binds + self._volumes = [b.split(':')[1] for b in self._binds] + + def run(self): + + # get image if missing + if self.force_pull or len(self._client.images(name=self._image)) == 0: + logger.info('Pulling docker image ' + self._image) + try: + for logline in self._client.pull(self._image, stream=True): + logger.debug(logline.decode('utf-8')) + except APIError as e: + self.__logger.warning("Error in Docker API: " + e.explanation) + raise + + # remove clashing container if a container with the same name exists + if self.auto_remove and self.name: + try: + self._client.remove_container(self.name, + force=True) + except APIError as e: + self.__logger.warning("Ignored error in Docker API: " + e.explanation) + + # run the container + try: + logger.debug('Creating image: %s command: %s volumes: %s' + % (self._image, self.command, self._binds)) + + host_config = self._client.create_host_config(binds=self._binds, + network_mode=self.network_mode) + + container = self._client.create_container(self._image, + command=self.command, + name=self.name, + environment=self.environment, + volumes=self._volumes, + host_config=host_config, + **self.container_options) + self._client.start(container['Id']) + + exit_status = self._client.wait(container['Id']) + if exit_status != 0: + stdout = False + stderr = True + error = self._client.logs(container['Id'], + stdout=stdout, + stderr=stderr) + if self.auto_remove: + try: + self._client.remove_container(container['Id']) + except docker.errors.APIError: + self.__logger.warning("Container " + container['Id'] + + " could not be removed") + if exit_status != 0: + raise ContainerError(container, exit_status, self.command, self._image, error) + + except ContainerError as e: + # catch non zero exti status and return it + container_name = '' + if self.name: + container_name = self.name + try: + message = e.message + except: + message = str(e) + self.__logger.error("Container " + container_name + + " exited with non zero code: " + message) + raise + except ImageNotFound as e: + self.__logger.error("Image " + self._image + " not found") + raise + except APIError as e: + self.__logger.error("Error in Docker API: "+e.explanation) + raise + + # delete temp dir + filesys = LocalFileSystem() + if self.mount_tmp and filesys.exists(self._host_tmp_dir): + filesys.remove(self._host_tmp_dir, recursive=True) diff --git a/test/contrib/docker_runner_test.py b/test/contrib/docker_runner_test.py new file mode 100644 index 0000000000..1a43f9e6a5 --- /dev/null +++ b/test/contrib/docker_runner_test.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2017 Open Targets +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +""" +Tests for Docker container wrapper for Luigi. + + +Requires: + +- docker: ``pip install docker`` + +Written and maintained by Andrea Pierleoni (@apierleoni). +Contributions by Eliseo Papa (@elipapa) +""" +import tempfile +from helpers import unittest +from tempfile import NamedTemporaryFile + +import luigi +import logging +from luigi.contrib.docker_runner import DockerTask + +logger = logging.getLogger('luigi-interface') + +try: + import docker + from docker.errors import ContainerError, ImageNotFound + client = docker.from_env() + client.version() +except ImportError: + raise unittest.SkipTest('Unable to load docker module') +except Exception: + raise unittest.SkipTest('Unable to connect to docker daemon') + +tempfile.tempdir = '/tmp' # set it explicitely to make it work out of the box in mac os +local_file = NamedTemporaryFile() +local_file.write(b'this is a test file\n') +local_file.flush() + + +class SuccessJob(DockerTask): + image = "busybox:latest" + name = "SuccessJob" + + +class FailJobImageNotFound(DockerTask): + image = "image-does-not-exists" + name = "FailJobImageNotFound" + + +class FailJobContainer(DockerTask): + image = "busybox" + name = "FailJobContainer" + command = 'cat this-file-does-not-exist' + + +class WriteToTmpDir(DockerTask): + image = "busybox" + name = "WriteToTmpDir" + container_tmp_dir = '/tmp/luigi-test' + command = 'test -d /tmp/luigi-test' + # command = 'test -d $LUIGI_TMP_DIR'# && echo ok >$LUIGI_TMP_DIR/test' + + +class MountLocalFileAsVolume(DockerTask): + image = "busybox" + name = "MountLocalFileAsVolume" + # volumes= {'/tmp/local_file_test': {'bind': local_file.name, 'mode': 'rw'}} + binds = [local_file.name + ':/tmp/local_file_test'] + command = 'test -f /tmp/local_file_test' + + +class MountLocalFileAsVolumeWithParam(DockerTask): + dummyopt = luigi.Parameter() + image = "busybox" + name = "MountLocalFileAsVolumeWithParam" + binds = [local_file.name + ':/tmp/local_file_test'] + command = 'test -f /tmp/local_file_test' + + +class MountLocalFileAsVolumeWithParamRedefProperties(DockerTask): + dummyopt = luigi.Parameter() + image = "busybox" + name = "MountLocalFileAsVolumeWithParamRedef" + + @property + def binds(self): + return [local_file.name + ':/tmp/local_file_test' + self.dummyopt] + + @property + def command(self): + return 'test -f /tmp/local_file_test' + self.dummyopt + + def complete(self): + return True + + +class MultipleDockerTask(luigi.WrapperTask): + '''because the volumes property is defined as a list, spinning multiple + containers led to conflict in the volume binds definition, with multiple + host directories pointing to the same container directory''' + def requires(self): + return [MountLocalFileAsVolumeWithParam(dummyopt=opt) + for opt in ['one', 'two', 'three']] + + +class MultipleDockerTaskRedefProperties(luigi.WrapperTask): + def requires(self): + return [MountLocalFileAsVolumeWithParamRedefProperties(dummyopt=opt) + for opt in ['one', 'two', 'three']] + + +class TestDockerTask(unittest.TestCase): + + # def tearDown(self): + # local_file.close() + + def test_success_job(self): + success = SuccessJob() + luigi.build([success], local_scheduler=True) + self.assertTrue(success) + + def test_temp_dir_creation(self): + writedir = WriteToTmpDir() + writedir.run() + + def test_local_file_mount(self): + localfile = MountLocalFileAsVolume() + localfile.run() + + def test_fail_job_image_not_found(self): + fail = FailJobImageNotFound() + self.assertRaises(ImageNotFound, fail.run) + + def test_fail_job_container(self): + fail = FailJobContainer() + self.assertRaises(ContainerError, fail.run) + + def test_multiple_jobs(self): + worked = MultipleDockerTask() + luigi.build([worked], local_scheduler=True) + self.assertTrue(worked) + + def test_multiple_jobs2(self): + worked = MultipleDockerTaskRedefProperties() + luigi.build([worked], local_scheduler=True) + self.assertTrue(worked) diff --git a/tox.ini b/tox.ini index ca8ce171a6..ea1eaa0221 100644 --- a/tox.ini +++ b/tox.ini @@ -10,6 +10,7 @@ deps= moto<1.0 HTTPretty==0.8.10 nose<2.0 + docker>=2.1.0 unittest2<2.0 boto<3.0 sqlalchemy<2.0 @@ -23,6 +24,7 @@ deps= py27-gcloud: avro py33-gcloud,py34-gcloud,py35-gcloud,py36-gcloud: avro-python3 gcloud: oauth2client<4.0.0 + google-compute-engine coverage>=4.1,<4.2 codecov>=1.4.0 requests<3.0