Contents (hide)
1.3.1 getSplits

Java implementation under Hadoop

Example: Word counting (Hadoop 0.20.2)

  1. import java.io.IOException;
  2.  
  3. import java.util.StringTokenizer;
  4.  
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.Partitioner;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.io.LongWritable;
  16.  
  17. public class WordCount { 
  18.  
  19. public static class MapClass extends Mapper<LongWritableTextTextIntWritable{
  20.     private final static IntWritable one = new IntWritable(1);
  21.     private Text word = new Text();
  22.     
  23.     @Override
  24.     public void map(LongWritable keyText valueContext context) throws IOException,  InterruptedException {
  25.       StringTokenizer itr = new StringTokenizer(value.toString())
  26.       while (itr.hasMoreTokens()) {
  27.         word.set(itr.nextToken());  
  28.         context.write(wordone);
  29.       }
  30.     }
  31.   }
  32.  
  33.   public static class ReduceClass extends Reducer<Text,IntWritable,Text,IntWritable{
  34.     @Override
  35.     public void reduce(Text keyIterable<IntWritablevaluesContext context) throws IOException,  InterruptedException {
  36.       int sum = 0;
  37.       for (IntWritable value : values) {
  38.         sum += value.get();
  39.       }
  40.       context.write(keynew IntWritable(sum))
  41.     }
  42.   }
  43.  
  44.     public static class PartitionerClass extends Partitioner<TextIntWritable{
  45.       @Override
  46.       public int getPartition(Text keyIntWritable valueint numPartitions) {
  47.         return getLanguage(key) % numPartitions;
  48.       }
  49.     
  50.       private int getLanguage(Text key) {
  51.          if (key.getLength() > 0) {
  52.             int c = key.charAt(0);
  53.             if (c >= Long.decode("0x05D0").longValue() && c <= Long.decode("0x05EA").longValue())
  54.                return 1;
  55.          }
  56.          return 0;
  57.       }
  58.     }
  59.  
  60.  public static void main(String[] args) throws Exception {
  61.     Configuration conf = new Configuration();
  62.     //conf.set("mapred.map.tasks","10");
  63.     //conf.set("mapred.reduce.tasks","2");
  64.     Job job = new Job(conf"word count");
  65.     job.setJarByClass(WordCount.class);
  66.     job.setMapperClass(MapClass.class);
  67.     job.setPartitionerClass(PartitionerClass.class);
  68.     job.setCombinerClass(ReduceClass.class);
  69.     job.setReducerClass(ReduceClass.class);
  70.     job.setOutputKeyClass(Text.class);
  71.     job.setOutputValueClass(IntWritable.class);
  72.     FileInputFormat.addInputPath(jobnew Path(args[0]));
  73.     FileOutputFormat.setOutputPath(jobnew Path(args[1]));
  74.     System.exit(job.waitForCompletion(true) ? 0 : 1);
  75.   }
  76.  
  77. }

>javac -cp ../hadoop-0.20.2-core.jar WordCount.java
>jar cvf wordcount.jar WordCount*.class 
>../bin/hadoop jar wordcount.jar WordCount in out

Main interfaces and classes, Configuration properties

Mapper, Reducer, Partitioner, Writable.

Configuration

Configuration and Job classes, Configuration properties.

Execution Model

http://farm4.static.flickr.com/3126/3529146657_5b5d025a5f_o.png

High-level MapReduce pipeline (taken from Yahoo!'s tutorial)

http://farm3.static.flickr.com/2275/3529146683_c8247ff6db_o.png

Detailed Hadoop MapReduce data flow (taken from Yahoo!'s tutorial)

http://farm3.static.flickr.com/2374/3529959828_0b689d1d5c_o.png

Combiner step inserted into the MapReduce data flow (taken from Yahoo!'s tutorial)

How Map and Reduce operations are actually carried out

Creating a custom InputFormat-OutputFormat and RecordReader

getSplits

The current (0.20.2) implementation of getSplits() method in FileInputFormat class:

  1. public abstract class FileInputFormat<KVextends InputFormat<KV{
  2. ...
  3.   public List<InputSplitgetSplits(JobContext job
  4.                                     ) throws IOException {
  5.     long minSize = Math.max(getFormatMinSplitSize()getMinSplitSize(job));
  6.     long maxSize = getMaxSplitSize(job);
  7.  
  8.     // generate splits
  9.     List<InputSplitsplits = new ArrayList<InputSplit>();
  10.     for (FileStatus filelistStatus(job)) {
  11.       Path path = file.getPath();
  12.       FileSystem fs = path.getFileSystem(job.getConfiguration());
  13.       long length = file.getLen();
  14.       BlockLocation[] blkLocations = fs.getFileBlockLocations(file0length);
  15.       if ((length != 0) && isSplitable(jobpath)) { 
  16.         long blockSize = file.getBlockSize();
  17.         long splitSize = computeSplitSize(blockSizeminSizemaxSize);
  18.  
  19.         long bytesRemaining = length;
  20.         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  21.           int blkIndex = getBlockIndex(blkLocationslength-bytesRemaining);
  22.           splits.add(new FileSplit(pathlength-bytesRemainingsplitSize
  23.                                    blkLocations[blkIndex].getHosts()));
  24.           bytesRemaining -= splitSize;
  25.         }
  26.         
  27.         if (bytesRemaining != 0) {
  28.           splits.add(new FileSplit(pathlength-bytesRemainingbytesRemaining
  29.                      blkLocations[blkLocations.length-1].getHosts()));
  30.         }
  31.       } else if (length != 0) {
  32.         splits.add(new FileSplit(path0lengthblkLocations[0].getHosts()));
  33.       } else { 
  34.         //Create empty hosts array for zero length files
  35.         splits.add(new FileSplit(path0lengthnew String[0]));
  36.       }
  37.     }
  38.     LOG.debug("Total # of splits: " + splits.size());
  39.     return splits;
  40.   }
  41. ...
  42. }

RecordReader

TextInputFormat

The default InputFormat is TextInputFormat, which extends FileInputFormat by providing LineRecordReader.

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18.  
  19. package org.apache.hadoop.mapreduce.lib.input;
  20.  
  21. import org.apache.hadoop.fs.Path;
  22. import org.apache.hadoop.io.LongWritable;
  23. import org.apache.hadoop.io.Text;
  24. import org.apache.hadoop.io.compress.CompressionCodec;
  25. import org.apache.hadoop.io.compress.CompressionCodecFactory;
  26. import org.apache.hadoop.mapreduce.InputFormat;
  27. import org.apache.hadoop.mapreduce.InputSplit;
  28. import org.apache.hadoop.mapreduce.JobContext;
  29. import org.apache.hadoop.mapreduce.RecordReader;
  30. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  31.  
  32. /** An {@link InputFormat} for plain text files.  Files are broken into lines.
  33.  * Either linefeed or carriage-return are used to signal end of line.  Keys are
  34.  * the position in the file, and values are the line of text.. */
  35. public class TextInputFormat extends FileInputFormat<LongWritableText{
  36.  
  37.   @Override
  38.   public RecordReader<LongWritableText
  39.     createRecordReader(InputSplit split,
  40.                        TaskAttemptContext context) {
  41.     return new LineRecordReader();
  42.   }
  43.  
  44.   @Override
  45.   protected boolean isSplitable(JobContext contextPath file) {
  46.     CompressionCodec codec = 
  47.       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
  48.     return codec == null;
  49.   }
  50.  
  51. }

LineRecordReader extracts keys and values from a given text file split, where the keys are the positions of the lines and the values are the content of the line:

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18.  
  19. package org.apache.hadoop.mapreduce.lib.input;
  20.  
  21. import java.io.IOException;
  22.  
  23. import org.apache.hadoop.conf.Configuration;
  24. import org.apache.hadoop.fs.FSDataInputStream;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.io.LongWritable;
  28. import org.apache.hadoop.io.Text;
  29. import org.apache.hadoop.io.compress.CompressionCodec;
  30. import org.apache.hadoop.io.compress.CompressionCodecFactory;
  31. import org.apache.hadoop.mapreduce.InputSplit;
  32. import org.apache.hadoop.mapreduce.RecordReader;
  33. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  34. import org.apache.hadoop.util.LineReader;
  35. import org.apache.commons.logging.LogFactory;
  36. import org.apache.commons.logging.Log;
  37.  
  38. /**
  39.  * Treats keys as offset in file and value as line. 
  40.  */
  41. public class LineRecordReader extends RecordReader<LongWritableText{
  42.   private static final Log LOG = LogFactory.getLog(LineRecordReader.class);
  43.  
  44.   private CompressionCodecFactory compressionCodecs = null;
  45.   private long start;
  46.   private long pos;
  47.   private long end;
  48.   private LineReader in;
  49.   private int maxLineLength;
  50.   private LongWritable key = null;
  51.   private Text value = null;
  52.  
  53.   public void initialize(InputSplit genericSplit,
  54.                          TaskAttemptContext context) throws IOException {
  55.     FileSplit split = (FileSplit) genericSplit;
  56.     Configuration job = context.getConfiguration();
  57.     this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
  58.                                     Integer.MAX_VALUE);
  59.     start = split.getStart();
  60.     end = start + split.getLength();
  61.     final Path file = split.getPath();
  62.     compressionCodecs = new CompressionCodecFactory(job);
  63.     final CompressionCodec codec = compressionCodecs.getCodec(file);
  64.  
  65.     // open the file and seek to the start of the split
  66.     FileSystem fs = file.getFileSystem(job);
  67.     FSDataInputStream fileIn = fs.open(split.getPath());
  68.     boolean skipFirstLine = false;
  69.     if (codec != null) {
  70.       in = new LineReader(codec.createInputStream(fileIn)job);
  71.       end = Long.MAX_VALUE;
  72.     } else {
  73.       if (start != 0) {
  74.         skipFirstLine = true;
  75.         --start;
  76.         fileIn.seek(start);
  77.       }
  78.       in = new LineReader(fileInjob);
  79.     }
  80.     if (skipFirstLine) {  // skip first line and re-establish "start".
  81.       start += in.readLine(new Text()0,
  82.                            (int)Math.min((long)Integer.MAX_VALUEend - start));
  83.     }
  84.     this.pos = start;
  85.   }
  86.   
  87.   public boolean nextKeyValue() throws IOException {
  88.     if (key == null) {
  89.       key = new LongWritable();
  90.     }
  91.     key.set(pos);
  92.     if (value == null) {
  93.       value = new Text();
  94.     }
  95.     int newSize = 0;
  96.     while (pos < end) {
  97.       newSize = in.readLine(valuemaxLineLength,
  98.                             Math.max((int)Math.min(Integer.MAX_VALUEend-pos),
  99.                                      maxLineLength));
  100.       if (newSize == 0) {
  101.         break;
  102.       }
  103.       pos += newSize;
  104.       if (newSize < maxLineLength) {
  105.         break;
  106.       }
  107.  
  108.       // line too long. try again
  109.       LOG.info("Skipped line of size " + newSize + " at pos " + 
  110.                (pos - newSize));
  111.     }
  112.     if (newSize == 0) {
  113.       key = null;
  114.       value = null;
  115.       return false;
  116.     } else {
  117.       return true;
  118.     }
  119.   }
  120.  
  121.   @Override
  122.   public LongWritable getCurrentKey() {
  123.     return key;
  124.   }
  125.  
  126.   @Override
  127.   public Text getCurrentValue() {
  128.     return value;
  129.   }
  130.  
  131.   /**
  132.    * Get the progress within the split
  133.    */
  134.   public float getProgress() {
  135.     if (start == end) {
  136.       return 0.0f;
  137.     } else {
  138.       return Math.min(1.0f(pos - start) / (float)(end - start));
  139.     }
  140.   }
  141.   
  142.   public synchronized void close() throws IOException {
  143.     if (in != null) {
  144.       in.close()
  145.     }
  146.   }
  147. }

UserActionRecordReader

Now, let us define an InputFormat for files which contain information about actions applied by users of cellular companeys.

Each line in these files describes one action of some user, in the following format: phone_number user_name action_type action_description

We first define data types for User and UserAction. Since User is about to be a key it should implements WritableComparable, where UserAction, which is about to be a value, should implement Writable.

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4.  
  5. import org.apache.hadoop.io.WritableComparable;
  6.  
  7.  
  8. public class User implements WritableComparable<User{
  9.  
  10.     protected long phonenumber;
  11.     protected String name;
  12.             
  13.     public User() {
  14.         phonenumber = -1;
  15.         name = null;
  16.     }
  17.     
  18.     public User(long phonenumberString name) {
  19.         this.phonenumber = phonenumber;
  20.         this.name = name;
  21.     }
  22.     
  23.     public User(String userSerializationint startIndex) {
  24.         String[] toks = userSerialization.split("\t");
  25.         phonenumber = Long.parseLong(toks[startIndex]);
  26.         name = toks[startIndex+1];
  27.     }
  28.  
  29.     public long getPhoneNumber() { return phonenumber}
  30.     public String getName() { return name}
  31.     
  32.     @Override
  33.     public void readFields(DataInput in) throws IOException {
  34.         phonenumber = in.readLong();
  35.         name = in.readUTF();
  36.         
  37.     }
  38.     @Override
  39.     public void write(DataOutput out) throws IOException {
  40.         out.writeLong(phonenumber);
  41.         out.writeUTF(name);
  42.     }
  43.     
  44.     @Override
  45.     public int compareTo(User other) {
  46.         return (int) (phonenumber - other.phonenumber);
  47.     }
  48.     
  49. }

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import java.text.ParseException;
  5. import java.text.SimpleDateFormat;
  6. import java.util.Date;
  7.  
  8. import org.apache.hadoop.io.Writable;
  9.  
  10. enum ActionType { CALLCLICKBROWSE }
  11.  
  12. class Action implements Writable {
  13.  
  14.     ActionType type;
  15.     String desc;
  16.     
  17.     public Action() {
  18.         type = null;
  19.         desc = null;
  20.     }
  21.  
  22.     public Action(ActionType typeString desc) {
  23.         this.type = type;
  24.         this.desc = desc;
  25.     }
  26.     
  27.     public Action(String userActionSerializationint startIndex) {
  28.         String[] toks = userActionSerialization.split("\t");
  29.         type.valueOf(toks[startIndex])
  30.         desc = toks[startIndex+1];
  31.     }
  32.     
  33.     public ActionType getType() { return type}
  34.     public String getDesc() { return desc}
  35.     
  36.     
  37.     @Override
  38.     public void readFields(DataInput in) throws IOException {
  39.         type.valueOf(in.readUTF());
  40.         desc = in.readUTF();                
  41.     }
  42.  
  43.     @Override
  44.     public void write(DataOutput out) throws IOException {
  45.         out.writeUTF(type.name());
  46.         out.writeUTF(desc);        
  47.     }
  48.     
  49. }
  50.  
  51. public class UserAction implements Writable {
  52.         
  53.     protected Date date;
  54.     protected Action action;
  55.     
  56.     public UserAction() {
  57.         date = null;
  58.         action = new Action();
  59.     }
  60.     
  61.     public UserAction(String userActionStringint start) throws ParseException {
  62.         String[] toks = userActionString.split("\t");
  63.         date = new SimpleDateFormat().parse(toks[start])
  64.         action = new Action(toks[start+1]);
  65.     }
  66.     
  67.     @Override
  68.     public void readFields(DataInput in) throws IOException {
  69.         date = new Date(in.readLong());
  70.         action.readFields(in);
  71.     }
  72.     
  73.     @Override
  74.     public void write(DataOutput out) throws IOException {
  75.         out.writeLong(date.getTime());
  76.         action.write(out);
  77.     }
  78.     
  79.     public Action getAction() {
  80.         return action;
  81.     }
  82.     
  83.     public Date getDate() {
  84.         return date;
  85.     }
  86.     
  87. }

UserActionRecordReader makes use of LineRecordReader, in order to extract users and their actions from the input split:

  1. import java.io.IOException;
  2. import java.text.ParseException;
  3.  
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.InputSplit;
  6. import org.apache.hadoop.mapreduce.RecordReader;
  7. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  8. import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
  9.  
  10.  
  11. public class UserActionRecordReader extends RecordReader<User,UserAction{ 
  12.  
  13.     LineRecordReader reader;
  14.     
  15.     UserActionRecordReader() {
  16.         reader = new LineRecordReader()
  17.     }
  18.     
  19.     @Override
  20.     public void initialize(InputSplit splitTaskAttemptContext context)
  21.             throws IOExceptionInterruptedException {
  22.         reader.initialize(splitcontext);
  23.     }
  24.  
  25.  
  26.     @Override
  27.     public void close() throws IOException {
  28.         reader.close();        
  29.     }
  30.     
  31.     @Override
  32.     public boolean nextKeyValue() throws IOExceptionInterruptedException {
  33.         return reader.nextKeyValue();
  34.     }
  35.     
  36.     @Override
  37.     public User getCurrentKey() throws IOExceptionInterruptedException {
  38.         return new User(reader.getCurrentValue().toString(),0);
  39.     }
  40.     
  41.     @Override
  42.     public UserAction getCurrentValue() throws IOExceptionInterruptedException {
  43.         try {
  44.             return new UserAction(reader.getCurrentValue().toString(),2);
  45.         } catch (ParseException e) {
  46.             throw new IOException(e);
  47.         }    
  48.     }
  49.     
  50.     @Override
  51.     public float getProgress() throws IOExceptionInterruptedException {
  52.         return reader.getProgress();
  53.     }
  54.     
  55.     
  56.  
  57.     
  58. }

Finally, let's define the input format:

  1. import org.apache.hadoop.fs.Path;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.io.compress.CompressionCodec;
  4. import org.apache.hadoop.io.compress.CompressionCodecFactory;
  5. import org.apache.hadoop.mapreduce.InputSplit;
  6. import org.apache.hadoop.mapreduce.JobContext;
  7. import org.apache.hadoop.mapreduce.RecordReader;
  8. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10.  
  11. public class UserActionInputFormat extends FileInputFormat<UserUserAction{
  12.  
  13.  
  14.       @Override
  15.       public RecordReader<UserUserAction>
  16.         createRecordReader(InputSplit split,
  17.                            TaskAttemptContext context) {
  18.         return new UserActionRecordReader();
  19.       }
  20.       
  21.       @Override
  22.       protected boolean isSplitable(JobContext contextPath file) {
  23.         CompressionCodec codec =
  24.           new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
  25.         return codec == null;
  26.       }
  27.  
  28. }

Main interfaces and classes

InputFormat, OutputFormat, RecordReader, RecordWriter.

Other Libraries for Parallel Computing

OpenMP

The Message Passing Interface (MPI)