Contents (hide)

Data join Example

Reducer-side join implementation, based on tagged values

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4.  
  5.  
  6. import org.apache.hadoop.io.Writable;
  7.  
  8.  
  9. public abstract class TaggedValue<T extends Writable,V extends Writableimplements Writable {
  10.  
  11.     // An implementation of value with tag, as a writable object
  12.  
  13.     protected T tag;
  14.     protected V value;
  15.  
  16.     TaggedValue() {
  17.         init();
  18.     }
  19.  
  20.     TaggedValue(T tag) {
  21.         this.tag = tag;
  22.         this.value = null;
  23.     }
  24.  
  25.     TaggedValue(T tag,V value) {
  26.         this.tag = tag;
  27.         this.value = value;
  28.     }
  29.  
  30.     protected abstract void init();
  31.  
  32.     @Override
  33.     public void readFields(DataInput data) throws IOException {
  34.         tag.readFields(data);
  35.         value.readFields(data);
  36.     }
  37.  
  38.     @Override
  39.     public void write(DataOutput data) throws IOException {
  40.         tag.write(data);
  41.         value.write(data);
  42.     }
  43.  
  44.     @Override
  45.     public String toString() {
  46.         return tag + ":" + value;
  47.     }
  48.  
  49.     @Override
  50.     public boolean equals(Object o) {
  51.         TaggedValue<T,Vother = (TaggedValue<T,V>)o;
  52.         return tag.equals(other.tag) && value.equals(other.value);
  53.     }
  54.  
  55.  
  56.     public T getTag() { return tag}
  57.     public V getvalue() { return value}
  58.     public void setValue(V value) { this.value = value}
  59. }

  1. import org.apache.hadoop.io.Text;
  2.  
  3.  
  4. public class TextTaggedValue extends TaggedValue<Text,Text{
  5.     public TextTaggedValue() {
  6.         super();
  7.     }
  8.  
  9.     public TextTaggedValue(Text tag) {
  10.         super(tag);
  11.     }
  12.  
  13.     public TextTaggedValue(Text tag,Text value) {
  14.         super(tag,value);
  15.     }
  16.  
  17.     @Override
  18.     protected void init() {
  19.         tag = new Text();
  20.         value = new Text();
  21.     }
  22.  
  23. }

  1. import java.io.IOException;
  2.  
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.InputSplit;
  5. import org.apache.hadoop.mapreduce.RecordReader;
  6. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  7.  
  8.  
  9. public class KeyTaggedValueLineRecordReader extends RecordReader<TextTaggedValue<Text,Text>> {
  10.  
  11.   // This record reader parses the input file into pairs of text key, and tagged value text,
  12.   // where the tag is based on the name of the input file
  13.   private KeyValueTextRecordReader reader;
  14.   private Text tag;
  15.  
  16.   public KeyTaggedValueLineRecordReader() {
  17.       super();
  18.       reader = new KeyValueTextRecordReader();
  19.   }
  20.       
  21.   @Override
  22.   public void initialize(InputSplit genericSplitTaskAttemptContext context) throws IOException {
  23.       reader.initialize(genericSplitcontext);
  24.       tag = new Text(((FileSplit)genericSplit).getPath().getName().split("-")[0]);
  25.   }
  26.  
  27.   @Override
  28.   public boolean nextKeyValue() throws IOException {
  29.     return reader.nextKeyValue();
  30.   }
  31.  
  32.   @Override
  33.   public TaggedValue<Text,TextgetCurrentValue() throws IOExceptionInterruptedException {
  34.     return new TextTaggedValue(tag,reader.getCurrentValue());
  35.   }
  36.  
  37.   @Override
  38.   public Text getCurrentKey() throws IOExceptionInterruptedException {
  39.     return reader.getCurrentKey();
  40.   }
  41.  
  42.   @Override
  43.   public float getProgress() throws IOException {
  44.     return reader.getProgress();
  45.   }
  46.  
  47.   @Override
  48.   public void close() throws IOException {
  49.     reader.close();
  50.   }
  51. }

  1. import org.apache.hadoop.fs.Path;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.io.compress.CompressionCodec;
  4. import org.apache.hadoop.io.compress.CompressionCodecFactory;
  5. import org.apache.hadoop.mapreduce.InputSplit;
  6. import org.apache.hadoop.mapreduce.JobContext;
  7. import org.apache.hadoop.mapreduce.RecordReader;
  8. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10.  
  11. public class KeyTaggedValueTextInputFormat extends FileInputFormat<TextTaggedValue<Text,Text>> {
  12.  
  13.   // An implementation of an InputFormat of a text key and text tagged value
  14.       @Override
  15.       public RecordReader<TextTaggedValue<Text,Text>>
  16.         createRecordReader(InputSplit split,
  17.                            TaskAttemptContext context) {
  18.         return new KeyTaggedValueLineRecordReader();
  19.       }
  20.  
  21.       @Override
  22.       protected boolean isSplitable(JobContext contextPath file) {
  23.         CompressionCodec codec =
  24.           new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
  25.         return codec == null;
  26.       }
  27. }

  1. import java.io.IOException;
  2. import java.util.HashMap;
  3. import java.util.LinkedList;
  4. import java.util.List;
  5. import java.util.Map;
  6.  
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  16.  
  17.  
  18. public class ReduceSideJoin {
  19.  
  20.     // This program gets two types of values and produce a join-by-key value sets
  21.     public static class MapClass  extends Mapper<Text,TaggedValue<Text,Text>,Text,TaggedValue<Text,Text>> {
  22.         // The map gets a key and tagged value (of 2 types) and emits the key and the value
  23.         @Override
  24.         public void map(Text keyTaggedValue<Text,TextvalueContext context) throws IOException,  InterruptedException {
  25.             context.write(keyvalue);
  26.         }
  27.     }
  28.  
  29.  
  30.     public static class ReduceClass  extends Reducer<Text,TaggedValue<Text,Text>,Text,Text{
  31.         @Override
  32.         public void reduce(Text keyIterable<TaggedValue<Text,Text>> taggedValuesContext context) throws IOExceptionInterruptedException {
  33.             // The reduce gets a key and a set of values of two types (identified by their tags)
  34.             // and generates a cross product of the two types of values
  35.             Map<Text,List<Text>> mapTag2Values = new HashMap<Text,List<Text>>();
  36.             for (TaggedValue<Text,TexttaggedValue : taggedValues) {
  37.                 List<Textvalues = mapTag2Values.get(taggedValue.getTag());
  38.                 if (values == null) {
  39.                     values = new LinkedList<Text>();
  40.                     mapTag2Values.put(taggedValue.getTag(),values);
  41.                 }
  42.                 values.add(taggedValue.getvalue());
  43.             }
  44.             crossProduct(key,mapTag2Values,context);
  45.         }
  46.  
  47.  
  48.         protected void crossProduct(Text key,Map<Text,List<Text>> mapTag2Values,Context context) throws IOExceptionInterruptedException {
  49.             // This specific implementation of the cross product, combine the data of the customers and the orders (
  50.             // of a given costumer id).
  51.             StringBuilder sbCustomer = new StringBuilder();
  52.             for (Text customer : mapTag2Values.get("customers")) {
  53.                 sbCustomer.append(customer.toString());
  54.                 sbCustomer.append(",");
  55.             }
  56.             for (Text order : mapTag2Values.get("orders"))
  57.                 context.write(keynew Text(sbCustomer.toString() + order.toString()));
  58.         }
  59.     }
  60.  
  61.  
  62.  
  63.     public static void main(String[] args) throws Exception {
  64.         Configuration conf = new Configuration();
  65.         Job job = new Job(conf"DataJoin");
  66.         job.setJarByClass(MapperSideJoinWithDistributedCache.class);
  67.         job.setMapperClass(MapClass.class);
  68.         job.setReducerClass(ReduceClass.class);
  69.         FileInputFormat.addInputPath(jobnew Path(args[0]));
  70.         FileOutputFormat.setOutputPath(jobnew Path(args[1]));
  71.         job.setInputFormatClass(KeyTaggedValueTextInputFormat.class);
  72.         job.setOutputFormatClass(TextOutputFormat.class);
  73.         job.setOutputKeyClass(Text.class);
  74.         job.setOutputValueClass(Text.class);
  75.  
  76.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  77.       }
  78. }

Reducer-side join implementation, based on secondary ordering of tagged keys

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4.  
  5. import org.apache.hadoop.io.WritableComparable;
  6.  
  7.  
  8. public abstract class TaggedKey<T extends WritableComparable,K extends WritableComparable
  9.     implements WritableComparable<TaggedKey<T,K>> {
  10.  
  11.     // An implementation of value with tag, as a writable object
  12.  
  13.     protected T tag;
  14.     protected K key;
  15.     
  16.     TaggedKey() {
  17.         init();
  18.     }
  19.  
  20.     TaggedKey(T tag) {
  21.         this.tag = tag;
  22.         this.key = null;
  23.     }
  24.  
  25.     TaggedKey(T tag,K key) {
  26.         this.tag = tag;
  27.         this.key = key;
  28.     }
  29.  
  30.     protected abstract void init();
  31.  
  32.     @Override
  33.     public void readFields(DataInput data) throws IOException {
  34.         tag.readFields(data);
  35.         key.readFields(data);
  36.     }
  37.  
  38.     @Override
  39.     public void write(DataOutput data) throws IOException {
  40.         tag.write(data);
  41.         key.write(data);
  42.     }
  43.  
  44.     @Override
  45.     public int compareTo(TaggedKey<T,Kother) {
  46.         int i = key.compareTo(other.key);
  47.         if (i==0)
  48.             i=tag.compareTo(other.tag);
  49.         return i;
  50.     }
  51.  
  52.     @Override    
  53.     public String toString() {
  54.         return tag + ":" + key;
  55.     }
  56.  
  57.     @Override
  58.     public boolean equals(Object o) {
  59.         TaggedKey<T,Kother = (TaggedKey<T,K>)o;
  60.         return tag.equals(other.tag) && key.equals(other.key);
  61.     }
  62.  
  63.     public T getTag() { return tag}
  64.     public K getKey() { return key}
  65.     public void setKey(K key) { this.key = key}
  66.  
  67. }

  1. import org.apache.hadoop.io.Text;
  2.  
  3.  
  4. public class TextTaggedKey extends TaggedKey<Text,Text{
  5.     public TextTaggedKey() {
  6.         super();
  7.     }
  8.  
  9.     public TextTaggedKey(Text tag) {
  10.         super(tag);
  11.     }
  12.  
  13.     public TextTaggedKey(Text tag,Text key) {
  14.         super(tag,key);
  15.     }
  16.  
  17.     @Override
  18.     public void init() {
  19.         tag = new Text();
  20.         key = new Text();
  21.     }
  22. }

  1. import java.io.IOException;
  2.  
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.InputSplit;
  5. import org.apache.hadoop.mapreduce.RecordReader;
  6. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  7.  
  8.  
  9. public class TaggedKeyValueLineRecordReader extends RecordReader<TaggedKey<Text,Text>, Text{
  10.  
  11.   // This record reader parses the input file into pairs of text key, and tagged value text,
  12.   // where the tag is based on the name of the input file
  13.   private KeyValueTextRecordReader reader;
  14.   private Text tag;
  15.  
  16.   
  17.   public TaggedKeyValueLineRecordReader() {
  18.       super();
  19.       reader = new KeyValueTextRecordReader();
  20.   }
  21.  
  22.   @Override
  23.   public void initialize(InputSplit genericSplitTaskAttemptContext context) throws IOException {
  24.       reader.initialize(genericSplitcontext);
  25.       tag = new Text(((FileSplit)genericSplit).getPath().getName().split("-")[0]);
  26.   }
  27.  
  28.   @Override
  29.   public boolean nextKeyValue() throws IOException {
  30.     return reader.nextKeyValue();
  31.   }
  32.  
  33.   @Override
  34.   public Text getCurrentValue() throws IOExceptionInterruptedException {
  35.     return reader.getCurrentValue();
  36.   }
  37.  
  38.   @Override
  39.   public TaggedKey<Text,TextgetCurrentKey() throws IOExceptionInterruptedException {
  40.     return new TextTaggedKey(tag,reader.getCurrentKey());
  41.   }
  42.  
  43.   @Override
  44.   public float getProgress() throws IOException {
  45.     return reader.getProgress();
  46.   }
  47.  
  48.   @Override
  49.   public void close() throws IOException {
  50.     reader.close();
  51.   }
  52. }

  1. import org.apache.hadoop.fs.Path;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.io.compress.CompressionCodec;
  4. import org.apache.hadoop.io.compress.CompressionCodecFactory;
  5. import org.apache.hadoop.mapreduce.InputSplit;
  6. import org.apache.hadoop.mapreduce.JobContext;
  7. import org.apache.hadoop.mapreduce.RecordReader;
  8. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10.  
  11.  
  12. public class TaggedKeyValueTextInputFormat extends FileInputFormat<TaggedKey<Text,Text>, Text{
  13.  
  14.   // An implementation of an InputFormat of a text key and text tagged value
  15.       @Override
  16.       public RecordReader<TaggedKey<Text,Text>, Text>
  17.         createRecordReader(InputSplit split,
  18.                            TaskAttemptContext context) {
  19.         return new TaggedKeyValueLineRecordReader();
  20.       }
  21.  
  22.       @Override
  23.       protected boolean isSplitable(JobContext contextPath file) {
  24.         CompressionCodec codec =
  25.           new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
  26.         return codec == null;
  27.       }
  28.  
  29.  
  30. }

  1. import java.io.IOException;
  2.  
  3. import java.util.LinkedList;
  4. import java.util.List;
  5.  
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Partitioner;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  16.  
  17.  
  18. public class ReduceSideJoinWithSecondaryOrdering {
  19.  
  20.     // In contrast to ReduceSideJoin, the joined data are written on-the-fly to the context,
  21.     // without aggregating the data in the memory, by using a secondary sort on the tagged keys
  22.     public static class MapClass  extends Mapper<TaggedKey<Text,Text>,Text,TaggedKey<Text,Text>,Text{
  23.         // The map gets a tagged key and a value and emits the key and the value
  24.         @Override
  25.         public void map(TaggedKey<Text,TextkeyText valueContext context) throws IOException,  InterruptedException {
  26.             context.write(keyvalue);
  27.         }
  28.     }
  29.  
  30.  
  31.     public static class ReduceClass  extends Reducer<TaggedKey<Text,Text>,Text,Text,Text{
  32.  
  33.         Text currentTag = new Text("");
  34.         Text currentKey = new Text("");
  35.         List<Textrelation1ValuesList = new LinkedList<Text>();
  36.         boolean writeMode = false;
  37.  
  38.         @Override
  39.         public void reduce(TaggedKey<Text,TexttaggedKeyIterable<TextvaluesContext context) throws IOExceptionInterruptedException {
  40.             // The reduce gets a tagged key and a set of values
  41.             // In case the first data set (sorted by the tagged key) was completely received,
  42.             // any set of the second dataset is written on-the-fly to the context,
  43.             // by applying the cross product method.
  44.             if (!currentKey.equals(taggedKey.getKey())) {
  45.                 relation1ValuesList.clear();
  46.                 writeMode = false;
  47.             } else
  48.                 writeMode = !currentTag.equals(taggedKey.getTag());
  49.  
  50.             if (writeMode)
  51.                 crossProduct(taggedKey.getKey(),values,context);
  52.             else {
  53.                 for (Text value : values)
  54.                     relation1ValuesList.add(new Text(value.toString()));
  55.             }
  56.  
  57.             currentTag = new Text (taggedKey.getTag().toString());
  58.             currentKey = new Text (taggedKey.getKey().toString());
  59.         }
  60.  
  61.         protected void crossProduct(Text key,Iterable<Textrelation2Values ,Context context) throws IOExceptionInterruptedException {
  62.             // This specific implementation of the cross product, combine the data of the customers and the orders (
  63.             // of a given costumer id).
  64.             StringBuilder sb = new StringBuilder();
  65.             for (Text value1 : relation1ValuesList) {
  66.                 sb.append(value1);
  67.                 sb.append(",");
  68.             }
  69.             for (Text relation2Value : relation2Values) 
  70.                 context.write(keynew Text(sb.toString() + relation2Value.toString()));
  71.         }
  72.     }
  73.  
  74.     public static class PartitionerClass extends Partitioner<TaggedKey<Text,Text>,Text{
  75.         // ensure that keys with same key are directed to the same reducer
  76.     @Override
  77.       public int getPartition(TaggedKey<Text,Textkey,Text valueint numPartitions) {
  78.         return  Math.abs(key.getKey().hashCode()) % numPartitions;
  79.       }
  80.     }
  81.  
  82.  
  83.     public static void main(String[] args) throws Exception {
  84.         Configuration conf = new Configuration();
  85.         Job job = new Job(conf"DataJoin");
  86.         job.setJarByClass(ReduceSideJoinWithSecondaryOrdering.class);
  87.         job.setMapperClass(MapClass.class);
  88.         job.setReducerClass(ReduceClass.class);
  89.         job.setPartitionerClass(PartitionerClass.class);
  90.         FileInputFormat.addInputPath(jobnew Path(args[0]));
  91.         FileOutputFormat.setOutputPath(jobnew Path(args[1]));
  92.         job.setInputFormatClass(TaggedKeyValueTextInputFormat.class);
  93.         job.setOutputFormatClass(TextOutputFormat.class);
  94.         job.setOutputKeyClass(Text.class);
  95.         job.setOutputValueClass(Text.class);
  96.         job.setMapOutputKeyClass(TaggedKey.class);
  97.         job.setMapOutputValueClass(Text.class);
  98.  
  99.  
  100.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  101.       }
  102. }

Reducer-side join with Hadoop's 0.20.0 DataJoinReducerBase pattern

Based on Hadoop in Action, Chuck Lam, 2010, section 5.2.1

  1. import java.io.DataOutput;
  2. import java.io.IOException;
  3.  
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.conf.Configured;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.io.Writable;
  9. import org.apache.hadoop.mapred.FileInputFormat;
  10. import org.apache.hadoop.mapred.FileOutputFormat;
  11. import org.apache.hadoop.mapred.JobClient;
  12. import org.apache.hadoop.mapred.JobConf;
  13. import org.apache.hadoop.util.Tool;
  14. import org.apache.hadoop.util.ToolRunner;
  15.  
  16.  
  17. // A demonstration of Hadoop's 0.20.0 DataJoinReducerBase pattern
  18. public class ReduceSideJoinByPattern extends Configured implements Tool {
  19.   public static class MapClass extends DataJoinMapperBase {
  20.     protected Text generateInputTag(String inputFile) {
  21.     String datasource = inputFile.split("-")[0];
  22.     return new Text(datasource);
  23.   }
  24.     protected Text generateGroupKey(TaggedMapOutput aRecord) {
  25.       String line = ((Text) aRecord.getData()).toString();
  26.       String[] tokens = line.split(",");
  27.       String groupKey = tokens[0];
  28.       return new Text(groupKey);
  29.     }
  30.   
  31.     protected TaggedMapOutput generateTaggedMapOutput(Object value) {
  32.       TaggedWritable retv = new TaggedWritable((Text) value);
  33.       retv.setTag(this.inputTag);
  34.       return retv;
  35.     }
  36.   }
  37.  
  38.   public static class Reduce extends DataJoinReducerBase {
  39.   
  40.   protected TaggedMapOutput combine(Object[] tagsObject[] valuesvalues) {
  41.     if (tags.length < 2) 
  42.       return null;
  43.     String joinedStr = "";
  44.     for (int i=0i<values.lengthi++) {
  45.     if (i > 0) 
  46.       joinedStr += ",";
  47.        TaggedWritable tw = (TaggedWritable) values[i];
  48.        String line = ((Text) tw.getData()).toString();
  49.        String[] tokens = line.split(","2);
  50.        joinedStr += tokens[1];
  51.     }
  52.     TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
  53.     retv.setTag((Text) tags[0]);
  54.     return retv;
  55.   }
  56. }
  57.  
  58. public static class TaggedWritable extends TaggedMapOutput {
  59.  
  60.   private Writable data;
  61.     
  62.   public TaggedWritable(Writable data) {
  63.     this.tag = new Text("");
  64.     this.data = data;
  65.   }
  66.  
  67.   public Writable getData() {
  68.     return data;
  69.   }
  70.  
  71.   public void write(DataOutput out) throws IOException {
  72.     this.tag.write(out);
  73.     this.data.write(out);
  74.   }
  75.  
  76.   public void readFields(DataInput in) throws IOException {
  77.     this.tag.readFields(in);
  78.     this.data.readFields(in);
  79.   }
  80. }
  81.  
  82. public int run(String[] args) throws Exception {
  83.   Configuration conf = getConf();
  84.   JobConf job = new JobConf(confReduceSideJoinByPattern.class);
  85.   Path in = new Path(args[0]);
  86.   Path out = new Path(args[1]);
  87.   FileInputFormat.setInputPaths(jobin);
  88.   FileOutputFormat.setOutputPath(jobout);
  89.   job.setJobName("DataJoinByPattern");
  90.   job.setMapperClass(MapClass.class);
  91.   job.setReducerClass(Reduce.class);
  92.   job.setInputFormat(TextInputFormat.class);
  93.   job.setOutputFormat(TextOutputFormat.class);
  94.   job.setOutputKeyClass(Text.class);
  95.   job.setOutputValueClass(TaggedWritable.class);
  96.   job.set("mapred.textoutputformat.separator"",");
  97.   JobClient.runJob(job);
  98.   return 0;
  99. }
  100.  
  101.   public static void main(String[] args) throws Exception {
  102.     int res = ToolRunner.run(new Configuration(),
  103.     new ReduceSideJoin(),
  104.     args);
  105.     System.exit(res);
  106.   }
  107. }

Mapper-side join with DistributedCache

Based on Hadoop in Action, Chuck Lam, 2010, section 5.2.2, with adaptation to Hadoop 0.20.2

  1. import java.io.BufferedReader;
  2.  
  3. import java.io.FileReader;
  4. import java.io.IOException;
  5. import java.util.Hashtable;
  6.  
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.filecache.DistributedCache;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.mapreduce.Job;
  12. import org.apache.hadoop.mapreduce.Mapper;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  16.  
  17. // A demostration of Hadoop's DistributedCache tool
  18. // 
  19.  
  20. public class MapperSideJoinWithDistributedCache {
  21.  
  22. public static class MapClass extends Mapper<TextTextTextText{
  23.  
  24.   private Hashtable<StringStringjoinData = new Hashtable<StringString>();
  25.     
  26.   @Override
  27.   public void setup(Context context) {
  28.     try {
  29.       Path [] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
  30.       if (cacheFiles != null && cacheFiles.length > 0) {
  31.         String line;
  32.         String[] tokens;
  33.         BufferedReader joinReader = new BufferedReader(new FileReader(cacheFiles[0].toString()));
  34.         try {
  35.           while ((line = joinReader.readLine()) != null) {
  36.           tokens = line.split(","2);
  37.           joinData.put(tokens[0]tokens[1]);
  38.         }
  39.         } finally {
  40.           joinReader.close();
  41.         }
  42.       }
  43.     } catch(IOException e) {
  44.       System.err.println("Exception reading DistributedCache: " + e);
  45.     }
  46.   }
  47.  
  48.   @Override
  49.   public void map(Text keyText valueContext context) throws IOExceptionInterruptedException {
  50.     String joinValue = joinData.get(key);
  51.     if (joinValue != null) {
  52.         context.write(key,new Text(value.toString() + "," + joinValue));
  53.     }
  54.   }
  55. }
  56.  
  57.  
  58.   public static void main(String[] args) throws Exception {
  59.       Configuration conf = new Configuration();
  60.       Job job = new Job(conf"DataJoin with DistributedCache");
  61.       conf.set("key.value.separator.in.input.line"",");
  62.       DistributedCache.addCacheFile(new URI(args[0])conf);
  63.       job.setJarByClass(MapperSideJoinWithDistributedCache.class);
  64.       job.setMapperClass(MapClass.class);
  65.       FileInputFormat.addInputPath(jobnew Path(args[1]));
  66.       FileOutputFormat.setOutputPath(jobnew Path(args[2]));
  67.       job.setInputFormatClass(KeyTaggedValueTextInputFormat.class);
  68.       job.setOutputFormatClass(TextOutputFormat.class);
  69.       System.exit(job.waitForCompletion(true) ? 0 : 1);
  70.   }
  71. }

Multiway Joins

  • F. N. Afrati and J. D. Ulman, Optimizing joins in a map-reduce environment, EDBT, March, 2010.

    Implementations of map-reduce are being used to perform many operations on very large data. We examine strategies for joining several relations in the map-reduce environment. Our new approach begins by identifying the map-key," the set of attributes that identify the Reduce process to which a Map process must send a particular tuple. Each attribute of the map-key gets a share," which is the number of buckets into which its values are hashed, to form a component of the identi¯er of a Reduce process. Relations have their tuples replicated in limited fashion, the degree of replication depending on the shares for those map-key attributes that are missing from their schema. We study the problem of optimizing the shares, given a ¯xed number of Reduce processes. An algorithm for detecting and ¯xing problems where a variable is mistakenly included in the map-key is given. Then, we consider two important special cases: chain joins and star joins. In each case we are able to determine the map key and determine the shares that yield the least replication. While the method we propose is not always superior to the conventional way of using map-reduce to implement joins, there are some important cases involving large-scale data where our method wins, including: (1) analytic queries in which a very large fact table is joined with smaller dimension tables, and (2) queries involving paths through graphs with high out-degree, such as the Web or a social network. more

Fuzzy Joins

  • Afrati, Foto N. and Sarma, Anish Das and Menestrina, David and Parameswaran, Aditya and Ullman, Jeffrey D., Fuzzy Joins Using MapReduce, ICDE, 2012.

    Fuzzy/similarity joins have been widely studied in the research community and extensively used in real-world applications. This paper proposes and evaluates several algorithms for finding all pairs of elements from an input set that meet a similarity threshold. The computation model is a single MapReduce job. Because we allow only one MapReduce round, the Reduce function must be designed so a given output pair is produced by only one task; for many algorithms, satisfying this condition is one of the biggest challenges. We break the cost of an algorithm into three components: the execution cost of the mappers, the execution cost of the reducers, and the communication cost from the mappers to reducers. The algorithms are presented first in terms of Hamming distance, but extensions to edit distance and Jaccard distance are shown as well. We find that there are many different approaches to the similarity-join problem using MapReduce, and none dominates the others when both communication and reducer costs are considered. Our cost analyses enable applications to pick the optimal algorithm based on their communication, memory, and cluster requirements. more

  • Foto N. Afrati, Anish Das Sarma, Semih Salihoglu and Jeffrey D. Ullman, Upper and Lower Bounds on the Cost of a Map-Reduce Computation, Proceedings of the VLDB Endowment, Volume 6 Issue 4, February 2013

    In this paper we study the tradeo between parallelism and communication cost in a map-reduce computation. For any problem that is not embarrassingly parallel," the ner we partition the work of the reducers so that more parallelism can be extracted, the greater will be the total communication between mappers and reducers. We introduce a model of problems that can be solved in a single round of mapreduce computation. This model enables a generic recipe for discovering lower bounds on communication cost as a function of the maximum number of inputs that can be assigned to one reducer. We use the model to analyze the tradeo for three problems: nding pairs of strings at Hamming distance d, nding triangles and other patterns in a larger graph, and matrix multiplication. For nding strings of Hamming distance 1, we have upper and lower bounds that match exactly. For triangles and many other graphs, we have upper and lower bounds that are the same to within a constant factor. For the problem of matrix multiplication, we have matching upper and lower bounds for one-round map-reduce algorithms. We are also able to explore tworound map-reduce algorithms for matrix multiplication and show that these never have more communication, for a given reducer size, than the best one-round algorithm, and often have signi cantly less. more