diff --git a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java index edbde8f4..c143f1ca 100644 --- a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java +++ b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java @@ -11,6 +11,7 @@ import com.hadoop.mapreduce.LzoSplitRecordReader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -25,6 +26,16 @@ public class DistributedLzoIndexer extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(DistributedLzoIndexer.class); + /** + * Override the default job name which is generated from the arguments. + */ + public static final String JOB_NAME_KEY = "lzo.indexer.distributed.job.name"; + /** + * Override the default length to which the job name will be truncated. Set non-positive to disable. + */ + public static final String JOB_NAME_MAX_LENGTH_KEY = "lzo.indexer.distributed.job.name.max.length"; + static final String DEFAULT_JOB_NAME_PREFIX = "Distributed Lzo Indexer"; + private static final int DEFAULT_JOB_NAME_MAX_LENGTH = 200; private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension(); private final PathFilter nonTemporaryFilter = new PathFilter() { @@ -66,6 +77,20 @@ private void walkPath(Path path, PathFilter pathFilter, List accumulator) } } + static void setJobName(Job job, String[] args) { + final Configuration conf = job.getConfiguration(); + + String name = conf.get(JOB_NAME_KEY, DEFAULT_JOB_NAME_PREFIX + " " + Arrays.toString(args)); + + final int maxLength = conf.getInt(JOB_NAME_MAX_LENGTH_KEY, DEFAULT_JOB_NAME_MAX_LENGTH); + + if (maxLength > 0 && name.length() > maxLength) { + name = name.substring(0, maxLength) + "..."; + } + + job.setJobName(name); + } + public int run(String[] args) throws Exception { if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) { printUsage(); @@ -85,7 +110,7 @@ public int run(String[] args) throws Exception { } Job job = new Job(getConf()); - job.setJobName("Distributed Lzo Indexer " + Arrays.toString(args)); + setJobName(job, args); job.setOutputKeyClass(Path.class); job.setOutputValueClass(LongWritable.class); diff --git a/src/test/java/com/hadoop/compression/lzo/TestDistLzoIndexerJobName.java b/src/test/java/com/hadoop/compression/lzo/TestDistLzoIndexerJobName.java new file mode 100644 index 00000000..a697b446 --- /dev/null +++ b/src/test/java/com/hadoop/compression/lzo/TestDistLzoIndexerJobName.java @@ -0,0 +1,89 @@ +package com.hadoop.compression.lzo; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; + +public class TestDistLzoIndexerJobName extends TestCase { + + public void testDefaultName() throws Exception { + String[] args = new String[]{ + "hdfs://cluster/user/test/output/file-m-00000.lzo", + }; + + Job job = new Job(new Configuration(false)); + DistributedLzoIndexer.setJobName(job, args); + + String expected = DistributedLzoIndexer.DEFAULT_JOB_NAME_PREFIX + " [hdfs://cluster/user/test/output/file-m-00000.lzo]"; + + assertEquals(expected, job.getJobName()); + } + + public void testCustomName() throws Exception { + String[] args = new String[]{ + "ignored", + }; + String customName = "--"; + + Configuration conf = new Configuration(false); + conf.set(DistributedLzoIndexer.JOB_NAME_KEY, customName); + Job job = new Job(conf); + DistributedLzoIndexer.setJobName(job, args); + + assertEquals(customName, job.getJobName()); + } + + public void testCustomNameTruncation() throws Exception { + String[] args = new String[]{ + "ignored", + }; + + Configuration conf = new Configuration(false); + conf.set(DistributedLzoIndexer.JOB_NAME_KEY, "123456789"); + conf.setInt(DistributedLzoIndexer.JOB_NAME_MAX_LENGTH_KEY, 5); + Job job = new Job(conf); + DistributedLzoIndexer.setJobName(job, args); + + assertEquals("12345...", job.getJobName()); + } + + public void testCustomLengthTruncation() throws Exception { + String[] args = new String[]{ + "hdfs://cluster/user/test/output/file-m-00000.lzo", + "hdfs://cluster/user/test/output/file-m-00001.lzo", + "hdfs://cluster/user/test/output/file-m-00002.lzo", + "hdfs://cluster/user/test/output/file-m-00003.lzo", + "hdfs://cluster/user/test/output/file-m-00003.lzo", + }; + + Configuration conf = new Configuration(false); + conf.setInt(DistributedLzoIndexer.JOB_NAME_MAX_LENGTH_KEY, 50); + Job job = new Job(conf); + DistributedLzoIndexer.setJobName(job, args); + + String expected = DistributedLzoIndexer.DEFAULT_JOB_NAME_PREFIX + " [hdfs://cluster/user/test/..."; + // Truncated length should be 50 + 3 for the "..." + assertEquals(53, expected.length()); + + assertEquals(expected, job.getJobName()); + } + + public void testDisabledTruncation() throws Exception { + String[] args = new String[]{ + "hdfs://cluster/user/test/output/file-m-00000.lzo", + "hdfs://cluster/user/test/output/file-m-00001.lzo", + "hdfs://cluster/user/test/output/file-m-00002.lzo", + "hdfs://cluster/user/test/output/file-m-00003.lzo", + "hdfs://cluster/user/test/output/file-m-00003.lzo", + }; + + Configuration conf = new Configuration(false); + conf.setInt(DistributedLzoIndexer.JOB_NAME_MAX_LENGTH_KEY, 0); + Job job = new Job(conf); + DistributedLzoIndexer.setJobName(job, args); + + String expected = DistributedLzoIndexer.DEFAULT_JOB_NAME_PREFIX + " [hdfs://cluster/user/test/output/file-m-00000.lzo, hdfs://cluster/user/test/output/file-m-00001.lzo, hdfs://cluster/user/test/output/file-m-00002.lzo, hdfs://cluster/user/test/output/file-m-00003.lzo, hdfs://cluster/user/test/output/file-m-00003.lzo]"; + assertEquals(expected, job.getJobName()); + } + +} \ No newline at end of file