Contents (hide)
1.1 Tasks

Data Processing Examples

Tasks

Extract caller-recipient phone number graph

  1. import java.io.IOException;
  2. import org.apache.hadoop.mapreduce.Mapper;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import org.apache.hadoop.io.LongWritable;
  5.  
  6. public class GenerateCallerRecipientGraph { 
  7.  
  8. public static class MapperClass extends Mapper<User,UserAction,LongWritable,LongWritable{
  9.     @Override
  10.     public void map(User userUserAction userActionContext context) throws IOException,  InterruptedException {
  11.         
  12.         if (userAction.getAction().getType() == ActionType.CALL)
  13.             context.write(
  14.                 new LongWritable(user.getPhoneNumber()),
  15.                 new LongWritable(Long.parseLong(userAction.getAction().getDesc())));
  16.     }
  17. }
  18.  
  19.  
  20.  public static class ReducerClass extends Reducer<LongWritable,LongWritable,LongWritable,LongWritable{
  21.     @Override
  22.     public void reduce(LongWritable callerIterable<LongWritablerecipientsContext context) throws IOException,  InterruptedException {
  23.         for (LongWritable recipient : recipients)
  24.             context.write(callerrecipient);        
  25.     }    
  26.   }
  27. }

Invert caller-recipient graph to recipient-caller graph

  1. import java.io.IOException;
  2.  
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import org.apache.hadoop.mapreduce.Reducer;
  6.  
  7.  
  8. public class InvertCallerRecipient {
  9.     public static class MapperClass  extends Mapper<LongWritable,LongWritable,LongWritable,LongWritable{
  10.         @Override
  11.         public void map(LongWritable callerLongWritable recipientContext context) throws IOException,  InterruptedException {
  12.             context.write(recipientcaller);
  13.         }
  14.     }
  15.  
  16.  
  17.     public static class ReducerClass  extends Reducer<LongWritable,LongWritable,LongWritable,LongWritable{
  18.         @Override
  19.         public void reduce(LongWritable recipientIterable<LongWritablecallersContext context) throws IOExceptionInterruptedException {
  20.             for (LongWritable caller : callers)
  21.                 context.write(recipientcaller);
  22.         }
  23.     }
  24. }

Count the number of callers for each recipient

  1. import java.io.IOException;
  2.  
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import org.apache.hadoop.mapreduce.Reducer;
  7.  
  8.  
  9. public class InvertAndCountCallerRecipient {
  10.     public static class MapperClass  extends Mapper<LongWritable,LongWritable,LongWritable,LongWritable{
  11.         @Override
  12.         public void map(LongWritable callerLongWritable recipientContext context) throws IOException,  InterruptedException {
  13.             context.write(recipientcaller);
  14.         }
  15.     }
  16.  
  17.  
  18.     public static class ReducerClass  extends Reducer<LongWritable,LongWritable,LongWritable,IntWritable{
  19.         @Override
  20.         public void reduce(LongWritable recipientIterable<LongWritablecallersContext context) throws IOExceptionInterruptedException {
  21.             int count = 0;
  22.             for (LongWritable caller : callers)
  23.                 count++;
  24.             context.write(recipientcount);
  25.         }
  26.     }
  27. }

Generate caller count histogram

  1. import java.io.IOException;
  2.  
  3.  
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import org.apache.hadoop.mapreduce.Reducer;
  7. import org.apache.hadoop.io.LongWritable;
  8.  
  9. public class CallerHistogram { 
  10.  
  11. public static class MapperClass extends Mapper<LongWritable,IntWritable,IntWritable,IntWritable{
  12.     @Override
  13.     public void map(LongWritable recipientIntWritable numOfCallersContext context) throws IOException,  InterruptedException {
  14.         context.write(numOfCallers,new IntWritable(1));
  15.     }
  16.   }
  17.  
  18.   public static class ReducerClass extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable{
  19.     @Override
  20.     public void reduce(IntWritable numOfCallersIterable<IntWritablecountsContext context) throws IOException,  InterruptedException {
  21.         int sum = 0;
  22.         for (IntWritable count : counts)
  23.             sum += count.get();
  24.         context.write(numOfCallersnew IntWritable(sum));
  25.     }
  26.   }
  27.  
  28. }

Distribution of calls per user per year

  1. import java.io.DataInput;
  2.  
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5.  
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.io.WritableComparable;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15.  
  16. class Call implements WritableComparable<Call{
  17.     long caller;
  18.     long recipient;
  19.     int year;
  20.     
  21.     Call() {
  22.         this.caller = -1;
  23.         this.recipient = -1;
  24.         this.year = -1;
  25.     }
  26.     
  27.     Call(long callerlong recipientint year) {
  28.         this.caller = caller;
  29.         this.recipient = recipient;
  30.         this.year = year;
  31.     }
  32.     
  33.     @Override
  34.     public void readFields(DataInput in) throws IOException {
  35.         this.caller = in.readLong();
  36.         this.recipient = in.readLong();
  37.         this.year = in.readInt();
  38.         
  39.     }
  40.     @Override
  41.     public void write(DataOutput out) throws IOException {
  42.         out.writeLong(caller);
  43.         out.writeLong(recipient);
  44.         out.writeInt(year);
  45.         
  46.     }
  47.     @Override
  48.     public int compareTo(Call other) {
  49.         int ret = (int) (caller - other.caller);
  50.         if (ret == 0)
  51.             ret = (int) (recipient - other.recipient);
  52.         if (ret == 0)
  53.             ret = year - other.year;
  54.  
  55.         return ret;
  56.     }
  57.     
  58.     
  59. }
  60.  
  61.  
  62. public class DistributePhoneCallsPerYear { 
  63.  
  64. public static class MapperClass extends Mapper<User,UserAction,Call,IntWritable{
  65.     @Override
  66.     public void map(User userUserAction userActionContext context) throws IOException,  InterruptedException {
  67.         
  68.         if (userAction.getAction().getType() == ActionType.CALL)
  69.         context.write(
  70.                 new Call(
  71.                         user.getPhoneNumber(),
  72.                         Long.parseLong(userAction.getAction().desc),
  73.                         userAction.getDate().getYear()),
  74.                 new IntWritable(1));
  75.     }
  76.   }
  77.  
  78.   public static class ReducerClass extends Reducer<Call,IntWritable,Call,IntWritable{
  79.     @Override
  80.     public void reduce(Call callIterable<IntWritablecountsContext context) throws IOException,  InterruptedException {
  81.         int sum = 0;
  82.         for (IntWritable count : counts)
  83.             sum += count.get();
  84.             context.write(callnew IntWritable(sum));        
  85.     }
  86.   }
  87.  
  88.   
  89.  public static void main(String[] args) throws Exception {
  90.     Configuration conf = new Configuration();
  91.     Job job = new Job(conf"word count");
  92.     job.setJarByClass(WordCount.class);
  93.     job.setMapperClass(MapperClass.class);
  94.     job.setReducerClass(ReducerClass.class);
  95.     job.setMapOutputKeyClass(Call.class);
  96.     job.setMapOutputValueClass(IntWritable.class);
  97.     job.setOutputKeyClass(Call.class);
  98.     job.setOutputValueClass(IntWritable.class);
  99.     job.setInputFormatClass(UserActionInputFormat.class);
  100.     FileInputFormat.addInputPath(jobnew Path(args[0]));
  101.     FileOutputFormat.setOutputPath(jobnew Path(args[1]));
  102.     System.exit(job.waitForCompletion(true) ? 0 : 1);
  103.   }
  104.  
  105. }

Task Chain Example

Pipeline of depending jobs by JobControl

  1. import java.util.LinkedList;
  2.  
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  6. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  8. import org.apache.hadoop.mapred.InputFormat;
  9. import org.apache.hadoop.mapred.JobConf;
  10. import org.apache.hadoop.mapred.Mapper;
  11. import org.apache.hadoop.mapred.OutputFormat;
  12. import org.apache.hadoop.mapred.Reducer;
  13. import org.apache.hadoop.mapred.jobcontrol.Job;
  14. import org.apache.hadoop.mapred.jobcontrol.JobControl;
  15. import org.apache.hadoop.mapred.lib.ChainMapper;
  16. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
  18. import org.apache.hadoop.conf.Configuration;
  19. import org.apache.hadoop.io.Text;
  20.  
  21. public class CitationPipeline {
  22.     public static void main(String[] args) {
  23.  
  24.         // Example of applying a pipeline of two mapper/reducer jobs, by using JobControl object,
  25.         // over a depending constraint.
  26.         // The first Job counts the number of citations for each patent,
  27.         // and the second job generate the histogram
  28.         // 0.21.0 compliant! in order to work with 0.20.2 use JobConf/JobClient mechanism
  29.         // Note: The code was not checked!!!
  30.  
  31.         try {
  32.             JobConf jobconf1 = new JobConf(new Configuration());
  33.             Job job1 = new Job(jobconf1);
  34.             jobconf1.setJarByClass(InvertAndCountCallerRecipient.class);
  35.             jobconf1.setMapperClass(InvertAndCountCallerRecipient.MapClass.class);
  36.             jobconf1.setReducerClass(InvertAndCountCallerRecipient.ReduceClass.class);
  37.             jobconf1.setMapOutputKeyClass(LongWritable.class);
  38.             jobconf1.setMapOutputValueClass(LongWritable.class);
  39.             jobconf1.setOutputKeyClass(LongWritable.class);
  40.             jobconf1.setOutputValueClass(IntWritable.class);
  41.             FileInputFormat.addInputPath(job1.getJob()new Path(args[0]));
  42.             FileOutputFormat.setOutputPath(job1.getJob()new Path("intermediate"));
  43.  
  44.             JobConf jobconf2 = new JobConf(new Configuration());
  45.             Job job2 = new Job(jobconf2);
  46.             jobconf2.setJarByClass(CallerHistogram.class);
  47.             jobconf2.setMapperClass(CallerHistogram.MapClass.class);
  48.             jobconf2.setReducerClass(CallerHistogram.ReduceClass.class);
  49.             jobconf2.setMapOutputKeyClass(IntWritable.class);
  50.             jobconf2.setMapOutputValueClass(IntWritable.class);
  51.             jobconf2.setOutputKeyClass(IntWritable.class);
  52.             jobconf2.setOutputValueClass(IntWritable.class);
  53.             FileInputFormat.addInputPath(job1.getJob()new Path("intermediate"));
  54.             FileOutputFormat.setOutputPath(job1.getJob()new Path(args[1]));
  55.  
  56.             job2.addDependingJob(job1);
  57.  
  58.             JobControl jc = new JobControl("JC");
  59.             jc.addJob(job1);
  60.             jc.addJob(job2);
  61.             jc.run();
  62.         } catch (Exception e) {
  63.  
  64.         }
  65.     }
  66. }

Chaining of pre-processing and post-processing mappers to a reducer

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  11. import org.apache.hadoop.mapred.lib.ChainMapper;
  12. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  13.  
  14. public class SpeechLemmaIndexingChain {
  15.     public static void main(String[] args) {
  16.  
  17.         // Example of direct chaining of several mappers (as preprocessors) to a reducer
  18.         // The first mapper gets a sound file and generate its text extraction
  19.         // The second mapper converts the tokens in the text to the equivalent lexemes
  20.         // The third mapper generates the inverted index
  21.         // The reducer emits the inverted index to the index file
  22.         // 0.21.0 compliant! in order to work with 0.20.2 use JobConf/JobClient mechanism
  23.         // Note: The code was not checked!!!
  24.  
  25.         Configuration conf = new Configuration();
  26.         Job job = new Job(conf"Job with chained tasks");
  27.         job.setJarByClass(SpeechLemmaIndexingChain.class);
  28.         job.setInputFormatClass(TextInputFormat.class);
  29.         job.setOutputFormatClass(TextOutputFormat.class);
  30.         FileInputFormat.setInputPaths(jobnew Path(args[0]));
  31.         FileOutputFormat.setOutputPath(jobnew Path(args[1]));
  32.  
  33.         Configuration mapText2SpeechConf = new Configuration(false);
  34.         ChainMapper.addMapper(jobSpeechRecognitionMapper.class,
  35.                               IntWritable.classSound.class,IntWritable.classText.class,
  36.                               truemapText2SpeechConf);
  37.  
  38.         Configuration mapLemmatizerConf = new Configuration(false);
  39.         ChainMapper.addMapper(jobLematizerMapper.class,
  40.                               IntWritable.classText.class,IntWritable.classText.class,
  41.                               truemapLemmatizerConf);
  42.  
  43.         Configuration mapIndexerConf = new Configuration(false);
  44.         ChainMapper.addMapper(jobIndexMapper.class,
  45.                               IntWritable.classText.class,Text.classIntWritable.class,
  46.                               truemapIndexerConf);
  47.  
  48.         Configuration reduceIndexerConf = new Configuration(false);
  49.         ChainReducer.setReducer(jobIndexReducer.class,
  50.                             LongWritable.classText.class,Text.classText.class,
  51.                             truereduceConf);
  52.  
  53.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  54.  
  55.     }
  56. }