Skip to content

Commit

Permalink
Move non-core like modules to luigi.contrib
Browse files Browse the repository at this point in the history
While it's of course a bit subjective that what is core and not. I
settled on choosing these modules.  I found the rest either to be kind
of core or very hard to move and eventually create seperate packages
for.

I conciously did not create an `luigi/mrrunner.py` importing contrib, as
it will not do as one expects as you typically will use `.__file__` on
it to ship it with your map reduce job.
  • Loading branch information
Tarrasch committed Feb 2, 2017
1 parent 69f0974 commit 97c38b6
Show file tree
Hide file tree
Showing 11 changed files with 1,205 additions and 1,152 deletions.
10 changes: 5 additions & 5 deletions luigi/contrib/hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
import luigi.task
import luigi.contrib.gcs
import luigi.contrib.hdfs
import luigi.s3
from luigi import mrrunner
import luigi.contrib.s3
from luigi.contrib import mrrunner

if six.PY2:
from itertools import imap as map
Expand Down Expand Up @@ -460,7 +460,7 @@ def run_job(self, job, tracking_url_callback=None):
# atomic output: replace output with a temporary work directory
if self.end_job_with_atomic_move_dir:
illegal_targets = (
luigi.s3.S3FlagTarget, luigi.contrib.gcs.GCSFlagTarget)
luigi.contrib.s3.S3FlagTarget, luigi.contrib.gcs.GCSFlagTarget)
if isinstance(job.output(), illegal_targets):
raise TypeError("end_job_with_atomic_move_dir is not supported"
" for {}".format(illegal_targets))
Expand Down Expand Up @@ -533,7 +533,7 @@ def run_job(self, job, tracking_url_callback=None):

allowed_input_targets = (
luigi.contrib.hdfs.HdfsTarget,
luigi.s3.S3Target,
luigi.contrib.s3.S3Target,
luigi.contrib.gcs.GCSTarget)
for target in luigi.task.flatten(job.input_hadoop()):
if not isinstance(target, allowed_input_targets):
Expand All @@ -543,7 +543,7 @@ def run_job(self, job, tracking_url_callback=None):

allowed_output_targets = (
luigi.contrib.hdfs.HdfsTarget,
luigi.s3.S3FlagTarget,
luigi.contrib.s3.S3FlagTarget,
luigi.contrib.gcs.GCSFlagTarget)
if not isinstance(job.output(), allowed_output_targets):
raise TypeError('output must be one of: {}'.format(
Expand Down
File renamed without changes.
Loading

0 comments on commit 97c38b6

Please sign in to comment.