diff --git a/people who survived and embarked in southern port b/people who survived and embarked in southern port new file mode 100644 index 0000000..76c771e --- /dev/null +++ b/people who survived and embarked in southern port @@ -0,0 +1,80 @@ +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + + public class embarked_south { + + public static class Map extends Mapper { + + private Text pcclass = new Text(); + private static final IntWritable one = new IntWritable(1); + public void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException { + String line = value.toString(); + String str[]=line.split(","); + + + if(str.length>11){ + + + pcclass.set(str[2]); + if((str[1].equals("1"))&& (str[11].equalsIgnoreCase("s")) ){ + + context.write(pcclass, one); + } + } + + + + } + + } + + public static class Reduce extends Reducer { + + public void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + int sum = 0; + + for (IntWritable val : values) { + + sum += val.get(); + } + + context.write(key, new IntWritable(sum)); + } + } + + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + + @SuppressWarnings("deprecation") + Job job = new Job(conf, "Averageage_survived"); + job.setJarByClass(embarked_south.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + // job.setNumReduceTasks(0); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(Map.class); + job.setReducerClass(Reduce.class); + + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + + FileInputFormat.addInputPath(job, new Path(args[0])); + FileOutputFormat.setOutputPath(job, new Path(args[1])); + Path out=new Path(args[1]); + out.getFileSystem(conf).delete(out); + job.waitForCompletion(true); + } + + }