diff --git a/doc/parameters.rst b/doc/parameters.rst index 1a4a8a721b..6dca716c30 100644 --- a/doc/parameters.rst +++ b/doc/parameters.rst @@ -88,6 +88,25 @@ are not the same instance: >>> hash(c) == hash(d) True +Parameter visibility +^^^^^^^^^^^^^^^^^^^^ + +Using :class:`~luigi.parameter.ParameterVisibility` you can configure parameter visibility. By default, all +parameters are public, but you can also set them hidden or private. + +.. code:: python + + >>> import luigi + >>> from luigi.parameter import ParameterVisibility + + >>> luigi.Parameter(visibility=ParameterVisibility.PRIVATE) + +``ParameterVisibility.PUBLIC`` (default) - visible everywhere + +``ParameterVisibility.HIDDEN`` - ignored in WEB-view, but saved into database if save db_history is true + +``ParameterVisibility.PRIVATE`` - visible only inside task. + Parameter types ^^^^^^^^^^^^^^^ diff --git a/luigi/parameter.py b/luigi/parameter.py index f619864090..4c4c3853a0 100644 --- a/luigi/parameter.py +++ b/luigi/parameter.py @@ -23,6 +23,7 @@ import abc import datetime import warnings +from enum import IntEnum import json from json import JSONEncoder from collections import OrderedDict, Mapping @@ -40,10 +41,26 @@ from luigi import configuration from luigi.cmdline_parser import CmdlineParser - _no_value = object() +class ParameterVisibility(IntEnum): + """ + Possible values for the parameter visibility option. Public is the default. + See :doc:`/parameters` for more info. + """ + PUBLIC = 0 + HIDDEN = 1 + PRIVATE = 2 + + @classmethod + def has_value(cls, value): + return any(value == item.value for item in cls) + + def serialize(self): + return self.value + + class ParameterException(Exception): """ Base exception. @@ -113,7 +130,8 @@ def run(self): _counter = 0 # non-atomically increasing counter used for ordering parameters. def __init__(self, default=_no_value, is_global=False, significant=True, description=None, - config_path=None, positional=True, always_in_help=False, batch_method=None): + config_path=None, positional=True, always_in_help=False, batch_method=None, + visibility=ParameterVisibility.PUBLIC): """ :param default: the default value for this parameter. This should match the type of the Parameter, i.e. ``datetime.date`` for ``DateParameter`` or ``int`` for @@ -140,6 +158,10 @@ def __init__(self, default=_no_value, is_global=False, significant=True, descrip parameter values into a single value. Used when receiving batched parameter lists from the scheduler. See :ref:`batch_method` + + :param visibility: A Parameter whose value is a :py:class:`~luigi.parameter.ParameterVisibility`. + Default value is ParameterVisibility.PUBLIC + """ self._default = default self._batch_method = batch_method @@ -150,6 +172,7 @@ def __init__(self, default=_no_value, is_global=False, significant=True, descrip positional = False self.significant = significant # Whether different values for this parameter will differentiate otherwise equal tasks self.positional = positional + self.visibility = visibility if ParameterVisibility.has_value(visibility) else ParameterVisibility.PUBLIC self.description = description self.always_in_help = always_in_help @@ -195,11 +218,11 @@ def _value_iterator(self, task_name, param_name): yield (self._get_value_from_config(task_name, param_name), None) yield (self._get_value_from_config(task_name, param_name.replace('_', '-')), 'Configuration [{}] {} (with dashes) should be avoided. Please use underscores.'.format( - task_name, param_name)) + task_name, param_name)) if self._config_path: yield (self._get_value_from_config(self._config_path['section'], self._config_path['name']), 'The use of the configuration [{}] {} is deprecated. Please use [{}] {}'.format( - self._config_path['section'], self._config_path['name'], task_name, param_name)) + self._config_path['section'], self._config_path['name'], task_name, param_name)) yield (self._default, None) def has_task_value(self, task_name, param_name): @@ -689,6 +712,7 @@ class DateIntervalParameter(Parameter): (eg. "2015-W35"). In addition, it also supports arbitrary date intervals provided as two dates separated with a dash (eg. "2015-11-04-2015-12-04"). """ + def parse(self, s): """ Parses a :py:class:`~luigi.date_interval.DateInterval` from the input. @@ -740,8 +764,10 @@ def field(key): def optional_field(key): return "(%s)?" % field(key) + # A little loose: ISO 8601 does not allow weeks in combination with other fields, but this regex does (as does python timedelta) - regex = "P(%s|%s(T%s)?)" % (field("weeks"), optional_field("days"), "".join([optional_field(key) for key in ["hours", "minutes", "seconds"]])) + regex = "P(%s|%s(T%s)?)" % (field("weeks"), optional_field("days"), + "".join([optional_field(key) for key in ["hours", "minutes", "seconds"]])) return self._apply_regex(regex, input) def _parseSimple(self, input): @@ -905,6 +931,7 @@ class _DictParamEncoder(JSONEncoder): """ JSON encoder for :py:class:`~DictParameter`, which makes :py:class:`~_FrozenOrderedDict` JSON serializable. """ + def default(self, obj): if isinstance(obj, _FrozenOrderedDict): return obj.get_wrapped() @@ -943,6 +970,7 @@ def run(self): tags, that are dynamically constructed outside Luigi), or you have a complex parameter containing logically related values (like a database connection config). """ + def normalize(self, value): """ Ensure that dictionary parameter is converted to a _FrozenOrderedDict so it can be hashed. @@ -996,6 +1024,7 @@ def run(self): $ luigi --module my_tasks MyTask --grades '[100,70]' """ + def normalize(self, x): """ Ensure that struct is recursively converted to a tuple so it can be hashed. @@ -1053,6 +1082,7 @@ def run(self): $ luigi --module my_tasks MyTask --book_locations '((12,3),(4,15),(52,1))' """ + def parse(self, x): """ Parse an individual value from the input. @@ -1100,6 +1130,7 @@ class MyTask(luigi.Task): $ luigi --module my_tasks MyTask --my-param-1 -3 --my-param-2 -2 """ + def __init__(self, left_op=operator.le, right_op=operator.lt, *args, **kwargs): """ :param function var_type: The type of the input variable, e.g. int or float. @@ -1178,6 +1209,7 @@ class MyTask(luigi.Task): same type and transparency of parameter value on the command line is desired. """ + def __init__(self, var_type=str, *args, **kwargs): """ :param function var_type: The type of the input variable, e.g. str, int, diff --git a/luigi/scheduler.py b/luigi/scheduler.py index b7993c760b..fbc01a838d 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -49,6 +49,7 @@ from luigi.task_status import DISABLED, DONE, FAILED, PENDING, RUNNING, SUSPENDED, UNKNOWN, \ BATCH_RUNNING from luigi.task import Config +from luigi.parameter import ParameterVisibility logger = logging.getLogger(__name__) @@ -280,7 +281,7 @@ def __eq__(self, other): class Task(object): def __init__(self, task_id, status, deps, resources=None, priority=0, family='', module=None, - params=None, accepts_messages=False, tracking_url=None, status_message=None, + params=None, param_visibilities=None, accepts_messages=False, tracking_url=None, status_message=None, progress_percentage=None, retry_policy='notoptional'): self.id = task_id self.stakeholders = set() # workers ids that are somehow related to this task (i.e. don't prune while any of these workers are still active) @@ -301,8 +302,11 @@ def __init__(self, task_id, status, deps, resources=None, priority=0, family='', self.resources = _get_default(resources, {}) self.family = family self.module = module - self.params = _get_default(params, {}) - + self.param_visibilities = _get_default(param_visibilities, {}) + self.params = {} + self.public_params = {} + self.hidden_params = {} + self.set_params(params) self.accepts_messages = accepts_messages self.retry_policy = retry_policy self.failures = Failures(self.retry_policy.disable_window) @@ -318,6 +322,13 @@ def __init__(self, task_id, status, deps, resources=None, priority=0, family='', def __repr__(self): return "Task(%r)" % vars(self) + def set_params(self, params): + self.params = _get_default(params, {}) + self.public_params = {key: value for key, value in self.params.items() if + self.param_visibilities.get(key, ParameterVisibility.PUBLIC) == ParameterVisibility.PUBLIC} + self.hidden_params = {key: value for key, value in self.params.items() if + self.param_visibilities.get(key, ParameterVisibility.PUBLIC) == ParameterVisibility.HIDDEN} + # TODO(2017-08-10) replace this function with direct calls to batchable # this only exists for backward compatibility def is_batchable(self): @@ -343,7 +354,7 @@ def has_excessive_failures(self): @property def pretty_id(self): - param_str = ', '.join(u'{}={}'.format(key, value) for key, value in sorted(self.params.items())) + param_str = ', '.join(u'{}={}'.format(key, value) for key, value in sorted(self.public_params.items())) return u'{}({})'.format(self.family, param_str) @@ -778,7 +789,7 @@ def forgive_failures(self, task_id=None): @rpc_method() def add_task(self, task_id=None, status=PENDING, runnable=True, deps=None, new_deps=None, expl=None, resources=None, - priority=0, family='', module=None, params=None, accepts_messages=False, + priority=0, family='', module=None, params=None, param_visibilities=None, accepts_messages=False, assistant=False, tracking_url=None, worker=None, batchable=None, batch_id=None, retry_policy_dict=None, owners=None, **kwargs): """ @@ -802,7 +813,7 @@ def add_task(self, task_id=None, status=PENDING, runnable=True, if worker.enabled: _default_task = self._make_task( task_id=task_id, status=PENDING, deps=deps, resources=resources, - priority=priority, family=family, module=module, params=params, + priority=priority, family=family, module=module, params=params, param_visibilities=param_visibilities, ) else: _default_task = None @@ -817,8 +828,10 @@ def add_task(self, task_id=None, status=PENDING, runnable=True, task.family = family if not getattr(task, 'module', None): task.module = module + if not task.param_visibilities: + task.param_visibilities = _get_default(param_visibilities, {}) if not task.params: - task.params = _get_default(params, {}) + task.set_params(params) if batch_id is not None: task.batch_id = batch_id @@ -1272,6 +1285,7 @@ def _upstream_status(self, task_id, upstream_status_table): def _serialize_task(self, task_id, include_deps=True, deps=None): task = self._state.get_task(task_id) + ret = { 'display_name': task.pretty_id, 'status': task.status, @@ -1280,7 +1294,7 @@ def _serialize_task(self, task_id, include_deps=True, deps=None): 'time_running': getattr(task, "time_running", None), 'start_time': task.time, 'last_updated': getattr(task, "updated", task.time), - 'params': task.params, + 'params': task.public_params, 'name': task.family, 'priority': task.priority, 'resources': task.resources, diff --git a/luigi/task.py b/luigi/task.py index 4340e513dc..08f27b8179 100644 --- a/luigi/task.py +++ b/luigi/task.py @@ -39,6 +39,7 @@ from luigi import parameter from luigi.task_register import Register +from luigi.parameter import ParameterVisibility Parameter = parameter.Parameter logger = logging.getLogger('luigi-interface') @@ -441,7 +442,7 @@ def __init__(self, *args, **kwargs): self.param_kwargs = dict(param_values) self._warn_on_wrong_param_types() - self.task_id = task_id_str(self.get_task_family(), self.to_str_params(only_significant=True)) + self.task_id = task_id_str(self.get_task_family(), self.to_str_params(only_significant=True, only_public=True)) self.__hash = hash(self.task_id) self.set_tracking_url = None @@ -482,18 +483,29 @@ def from_str_params(cls, params_str): return cls(**kwargs) - def to_str_params(self, only_significant=False): + def to_str_params(self, only_significant=False, only_public=False): """ Convert all parameters to a str->str hash. """ params_str = {} params = dict(self.get_params()) for param_name, param_value in six.iteritems(self.param_kwargs): - if (not only_significant) or params[param_name].significant: + if (((not only_significant) or params[param_name].significant) + and ((not only_public) or params[param_name].visibility == ParameterVisibility.PUBLIC) + and params[param_name].visibility != ParameterVisibility.PRIVATE): params_str[param_name] = params[param_name].serialize(param_value) return params_str + def _get_param_visibilities(self): + param_visibilities = {} + params = dict(self.get_params()) + for param_name, param_value in six.iteritems(self.param_kwargs): + if params[param_name].visibility != ParameterVisibility.PRIVATE: + param_visibilities[param_name] = params[param_name].visibility.serialize() + + return param_visibilities + def clone(self, cls=None, **kwargs): """ Creates a new instance from an existing instance where some of the args have changed. diff --git a/luigi/worker.py b/luigi/worker.py index 5c76bbc3de..54954765e2 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -565,6 +565,9 @@ def _add_task(self, *args, **kwargs): for batch_task in self._batch_running_tasks.pop(task_id): self._add_task_history.append((batch_task, status, True)) + if task and kwargs.get('params'): + kwargs['param_visibilities'] = task._get_param_visibilities() + self._scheduler.add_task(*args, **kwargs) logger.info('Informed scheduler that task %s has status %s', task_id, status) diff --git a/setup.py b/setup.py index 176d671636..89cffbcb1d 100644 --- a/setup.py +++ b/setup.py @@ -13,6 +13,7 @@ # the License. import os +import sys from setuptools import setup @@ -48,6 +49,9 @@ def get_static_files(path): install_requires.remove('python-daemon<3.0') install_requires.append('sphinx>=1.4.4') # Value mirrored in doc/conf.py +if sys.version_info < (3, 4): + install_requires.append('enum34>1.1.0') + setup( name='luigi', version='2.7.6', diff --git a/test/db_task_history_test.py b/test/db_task_history_test.py index 8b162d282e..d302bed292 100644 --- a/test/db_task_history_test.py +++ b/test/db_task_history_test.py @@ -24,6 +24,7 @@ from luigi.db_task_history import DbTaskHistory from luigi.task_status import DONE, PENDING, RUNNING import luigi.scheduler +from luigi.parameter import ParameterVisibility class DummyTask(luigi.Task): @@ -32,7 +33,8 @@ class DummyTask(luigi.Task): class ParamTask(luigi.Task): param1 = luigi.Parameter() - param2 = luigi.IntParameter() + param2 = luigi.IntParameter(visibility=ParameterVisibility.HIDDEN) + param3 = luigi.Parameter(default="empty", visibility=ParameterVisibility.PRIVATE) class DbTaskHistoryTest(unittest.TestCase): diff --git a/test/scheduler_parameter_visibilities_test.py b/test/scheduler_parameter_visibilities_test.py new file mode 100644 index 0000000000..b3cae1f579 --- /dev/null +++ b/test/scheduler_parameter_visibilities_test.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2012-2015 Spotify AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from helpers import LuigiTestCase, RunOnceTask +import server_test + +import luigi +import luigi.scheduler +import luigi.worker +from luigi.parameter import ParameterVisibility +import json +import time + + +class SchedulerParameterVisibilitiesTest(LuigiTestCase): + def test_task_with_deps(self): + s = luigi.scheduler.Scheduler(send_messages=True) + with luigi.worker.Worker(scheduler=s) as w: + class DynamicTask(RunOnceTask): + dynamic_public = luigi.Parameter(default="dynamic_public") + dynamic_hidden = luigi.Parameter(default="dynamic_hidden", visibility=ParameterVisibility.HIDDEN) + dynamic_private = luigi.Parameter(default="dynamic_private", visibility=ParameterVisibility.PRIVATE) + + class RequiredTask(RunOnceTask): + required_public = luigi.Parameter(default="required_param") + required_hidden = luigi.Parameter(default="required_hidden", visibility=ParameterVisibility.HIDDEN) + required_private = luigi.Parameter(default="required_private", visibility=ParameterVisibility.PRIVATE) + + class Task(RunOnceTask): + a = luigi.Parameter(default="a") + b = luigi.Parameter(default="b", visibility=ParameterVisibility.HIDDEN) + c = luigi.Parameter(default="c", visibility=ParameterVisibility.PRIVATE) + d = luigi.Parameter(default="d", visibility=ParameterVisibility.PUBLIC) + + def requires(self): + return required_task + + def run(self): + yield dynamic_task + + dynamic_task = DynamicTask() + required_task = RequiredTask() + task = Task() + + w.add(task) + w.run() + + time.sleep(1) + task_deps = s.dep_graph(task_id=task.task_id) + required_task_deps = s.dep_graph(task_id=required_task.task_id) + dynamic_task_deps = s.dep_graph(task_id=dynamic_task.task_id) + + self.assertEqual('Task(a=a, d=d)', task_deps[task.task_id]['display_name']) + self.assertEqual('RequiredTask(required_public=required_param)', + required_task_deps[required_task.task_id]['display_name']) + self.assertEqual('DynamicTask(dynamic_public=dynamic_public)', + dynamic_task_deps[dynamic_task.task_id]['display_name']) + + self.assertEqual({'a': 'a', 'd': 'd'}, task_deps[task.task_id]['params']) + self.assertEqual({'required_public': 'required_param'}, + required_task_deps[required_task.task_id]['params']) + self.assertEqual({'dynamic_public': 'dynamic_public'}, + dynamic_task_deps[dynamic_task.task_id]['params']) + + def test_public_and_hidden_params(self): + s = luigi.scheduler.Scheduler(send_messages=True) + with luigi.worker.Worker(scheduler=s) as w: + class Task(RunOnceTask): + a = luigi.Parameter(default="a") + b = luigi.Parameter(default="b", visibility=ParameterVisibility.HIDDEN) + c = luigi.Parameter(default="c", visibility=ParameterVisibility.PRIVATE) + d = luigi.Parameter(default="d", visibility=ParameterVisibility.PUBLIC) + + task = Task() + + w.add(task) + w.run() + + time.sleep(1) + t = s._state.get_task(task.task_id) + self.assertEqual({'b': 'b'}, t.hidden_params) + self.assertEqual({'a': 'a', 'd': 'd'}, t.public_params) + self.assertEqual({'a': 0, 'b': 1, 'd': 0}, t.param_visibilities) + + +class Task(RunOnceTask): + a = luigi.Parameter(default="a") + b = luigi.Parameter(default="b", visibility=ParameterVisibility.HIDDEN) + c = luigi.Parameter(default="c", visibility=ParameterVisibility.PRIVATE) + d = luigi.Parameter(default="d", visibility=ParameterVisibility.PUBLIC) + + +class RemoteSchedulerParameterVisibilitiesTest(server_test.ServerTestBase): + def test_public_params(self): + task = Task() + luigi.build(tasks=[task], workers=2, scheduler_port=self.get_http_port()) + + time.sleep(1) + + response = self.fetch('/api/graph') + + body = response.body + decoded = body.decode('utf8').replace("'", '"') + data = json.loads(decoded) + + self.assertEqual({'a': 'a', 'd': 'd'}, data['response'][task.task_id]['params']) diff --git a/test/visible_parameters_test.py b/test/visible_parameters_test.py new file mode 100644 index 0000000000..e644aa7cb0 --- /dev/null +++ b/test/visible_parameters_test.py @@ -0,0 +1,95 @@ +import luigi +from luigi.parameter import ParameterVisibility +from helpers import unittest +import json + + +class TestTask1(luigi.Task): + param_one = luigi.Parameter(default='1', visibility=ParameterVisibility.HIDDEN, significant=True) + param_two = luigi.Parameter(default='2', significant=True) + param_three = luigi.Parameter(default='3', visibility=ParameterVisibility.PRIVATE, significant=True) + + +class TestTask2(luigi.Task): + param_one = luigi.Parameter(default='1', visibility=ParameterVisibility.PRIVATE) + param_two = luigi.Parameter(default='2', visibility=ParameterVisibility.PRIVATE) + param_three = luigi.Parameter(default='3', visibility=ParameterVisibility.PRIVATE) + + +class TestTask3(luigi.Task): + param_one = luigi.Parameter(default='1', visibility=ParameterVisibility.HIDDEN, significant=True) + param_two = luigi.Parameter(default='2', visibility=ParameterVisibility.HIDDEN, significant=False) + param_three = luigi.Parameter(default='3', visibility=ParameterVisibility.HIDDEN, significant=True) + + +class TestTask4(luigi.Task): + param_one = luigi.Parameter(default='1', visibility=ParameterVisibility.PUBLIC, significant=True) + param_two = luigi.Parameter(default='2', visibility=ParameterVisibility.PUBLIC, significant=False) + param_three = luigi.Parameter(default='3', visibility=ParameterVisibility.PUBLIC, significant=True) + + +class Test(unittest.TestCase): + def test_to_str_params(self): + task = TestTask1() + + self.assertEqual(task.to_str_params(), {'param_one': '1', 'param_two': '2'}) + + task = TestTask2() + + self.assertEqual(task.to_str_params(), {}) + + task = TestTask3() + + self.assertEqual(task.to_str_params(), {'param_one': '1', 'param_two': '2', 'param_three': '3'}) + + def test_all_public_equals_all_hidden(self): + hidden = TestTask3() + public = TestTask4() + + self.assertEqual(public.to_str_params(), hidden.to_str_params()) + + def test_all_public_equals_all_hidden_using_significant(self): + hidden = TestTask3() + public = TestTask4() + + self.assertEqual(public.to_str_params(only_significant=True), hidden.to_str_params(only_significant=True)) + + def test_private_params_and_significant(self): + task = TestTask1() + + self.assertEqual(task.to_str_params(), task.to_str_params(only_significant=True)) + + def test_param_visibilities(self): + task = TestTask1() + + self.assertEqual(task._get_param_visibilities(), {'param_one': 1, 'param_two': 0}) + + def test_incorrect_visibility_value(self): + class Task(luigi.Task): + a = luigi.Parameter(default='val', visibility=5) + + task = Task() + + self.assertEqual(task._get_param_visibilities(), {'a': 0}) + + def test_task_id_exclude_hidden_and_private_params(self): + task = TestTask1() + + self.assertEqual({'param_two': '2'}, task.to_str_params(only_public=True)) + + def test_json_dumps(self): + public = json.dumps(ParameterVisibility.PUBLIC.serialize()) + hidden = json.dumps(ParameterVisibility.HIDDEN.serialize()) + private = json.dumps(ParameterVisibility.PRIVATE.serialize()) + + self.assertEqual('0', public) + self.assertEqual('1', hidden) + self.assertEqual('2', private) + + public = json.loads(public) + hidden = json.loads(hidden) + private = json.loads(private) + + self.assertEqual(0, public) + self.assertEqual(1, hidden) + self.assertEqual(2, private) diff --git a/tox.ini b/tox.ini index 4876423fba..827000fb12 100644 --- a/tox.ini +++ b/tox.ini @@ -110,6 +110,7 @@ deps = boto3 Sphinx>=1.4.4,<1.5 sphinx_rtd_theme + enum34>1.1.0 commands = # build API docs sphinx-apidoc -o doc/api -T luigi --separate