A universal cache for Hadoop

The whole project code, written by Adi Suissa and Yaron Gonen, can be found here.

  1. package cache.io;
  2.  
  3. import java.io.IOException;
  4. import java.util.List;
  5.  
  6. import org.apache.commons.logging.Log;
  7. import org.apache.commons.logging.LogFactory;
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.io.Writable;
  10. import org.apache.hadoop.mapreduce.InputFormat;
  11. import org.apache.hadoop.mapreduce.InputSplit;
  12. import org.apache.hadoop.mapreduce.Job;
  13. import org.apache.hadoop.mapreduce.JobContext;
  14. import org.apache.hadoop.mapreduce.RecordReader;
  15. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  16. import org.apache.hadoop.util.ReflectionUtils;
  17.  
  18. import cache.CacheInputFormatId;
  19. import cache.NodeCacheManager;
  20.  
  21. public class CacheInputFormat<K extends WritableV extends Writableextends InputFormat<KV{
  22.  
  23.     public static final String DELEGATE_INPUT_FORMAT_CONF = "mapreduce.input.cache.delegateInputFormatClass";
  24.     // This is to uniquely identify the input split.
  25.     public static final String DELEGATE_INPUT_FORMAT_ID   = "mapreduce.input.cache.delegateInputFormatId";
  26.     private static final Log LOG = LogFactory.getLog(CacheInputFormat.class);
  27.  
  28.     private InputFormat<KVdelegateInputFormat = null;
  29.     private NodeCacheManager cacheManager;
  30.  
  31.     /**
  32.      * Set the delegate {@link InputFormat}.
  33.      * This method must be called during the initialization of the job.
  34.      * 
  35.      * @param job
  36.      * @param delegateInputFormatClass
  37.      */
  38.     @SuppressWarnings("unchecked")
  39.     public static void setDelegateInputFormatData(Job jobClass<? extends InputFormatdelegateInputFormatClassString inputFormatId) {
  40.         String delegateInputFormatClassName = delegateInputFormatClass.getName();
  41.         LOG.info("Setting the cache input format class to: " + delegateInputFormatClassName);
  42.         Configuration conf = job.getConfiguration();
  43.         conf.set(CacheInputFormat.DELEGATE_INPUT_FORMAT_CONFdelegateInputFormatClassName);
  44.         conf.set(CacheInputFormat.DELEGATE_INPUT_FORMAT_IDinputFormatId);
  45.     }
  46.  
  47.     /**
  48.      * Retrieve the delegate {@link InputFormat}.
  49.      * 
  50.      * @param job
  51.      * @return
  52.      */
  53.     @SuppressWarnings("unchecked")
  54.     static Class<? extends InputFormat<?,?>> getDelegateInputFormatClass(Configuration conf) {
  55.         String delegateInputFormatClassName = conf.get(CacheInputFormat.DELEGATE_INPUT_FORMAT_CONF);
  56.         Class<? extends InputFormat<?,?>> delegateInputFormatClass;
  57.         try {
  58.             delegateInputFormatClass = (Class<? extends InputFormat<?,?>>)conf.getClassByName(delegateInputFormatClassName);
  59.         } catch (ClassNotFoundException e) {
  60.             LOG.fatal("Delegate InputFormat " + delegateInputFormatClassName + " not found!!");
  61.             throw new RuntimeException();
  62.         }
  63.         return delegateInputFormatClass;
  64.     }
  65.     
  66.     public static String getDelegateInputFormatId(Configuration conf) {
  67.         return conf.get(CacheInputFormat.DELEGATE_INPUT_FORMAT_ID);
  68.     }
  69.  
  70.     public CacheInputFormat() {
  71.         cacheManager = NodeCacheManager.INSTANCE;
  72.     }
  73.     
  74.     @SuppressWarnings("unchecked")
  75.     @Override
  76.     public RecordReader<KVcreateRecordReader(InputSplit splitTaskAttemptContext context) throws IOExceptionInterruptedException {
  77.         Configuration conf = context.getConfiguration();
  78.         String cacheId = conf.get(CacheInputFormat.DELEGATE_INPUT_FORMAT_ID);
  79.         CacheInputFormatId splitId = new CacheInputFormatId(splitcacheId);
  80.  
  81.         // If the split was already cached
  82.         if (cacheManager.contains(splitId)) {
  83.             CacheRecordReader<KVcacheRecordReader = new CacheRecordReader<KV>();
  84.             cacheRecordReader.initialize(splitcontext);
  85.  
  86.             LOG.info("Getting the cache record reader.");
  87.             return cacheRecordReader;
  88.         } else {
  89.             if (this.delegateInputFormat == null) {
  90.                 this.delegateInputFormat = (InputFormat<KV>) ReflectionUtils.newInstance(getDelegateInputFormatClass(conf)conf);
  91.             }
  92.             
  93.             CacheLoaderRecordReader<KVcacheLoaderRecordReader = new CacheLoaderRecordReader<KV>();
  94.             cacheLoaderRecordReader.setDelegateRecordReader(this.delegateInputFormat.createRecordReader(splitcontext));
  95.             cacheLoaderRecordReader.initialize(splitcontext);
  96.             
  97.             LOG.info("Getting the regular record reader.");
  98.             return cacheLoaderRecordReader;
  99.         }
  100.     }
  101.  
  102.     @SuppressWarnings("unchecked")
  103.     @Override
  104.     public List<InputSplitgetSplits(JobContext context) throws IOExceptionInterruptedException {
  105.         if (this.delegateInputFormat == null) {
  106.             Configuration conf = context.getConfiguration();
  107.             Class delegateInputFormatClass = getDelegateInputFormatClass(conf);
  108.             this.delegateInputFormat = (InputFormat<KV>) ReflectionUtils.newInstance(delegateInputFormatClassconf);
  109.         } 
  110.         
  111.         return this.delegateInputFormat.getSplits(context);
  112.     }
  113.  
  114. }

  1. package cache.io;
  2.  
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5.  
  6. import org.apache.commons.logging.Log;
  7. import org.apache.commons.logging.LogFactory;
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.io.Writable;
  10. import org.apache.hadoop.mapreduce.InputSplit;
  11. import org.apache.hadoop.mapreduce.RecordReader;
  12. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  13.  
  14. import util.sharedmemory.SMException;
  15. import util.sharedmemory.SharedMemory;
  16.  
  17. import cache.CacheDataEntry;
  18. import cache.CacheDataOutput;
  19. import cache.CacheInputFormatId;
  20. import cache.NodeCacheManager;
  21.  
  22. /**
  23.  * A class that reads records from some delegate record reader, and stores the records in a cache.
  24.  * 
  25.  * @author adi
  26.  *
  27.  * @param <KEYIN>
  28.  * @param <VALUEIN>
  29.  */
  30. public class CacheLoaderRecordReader<KEYIN extends WritableVALUEIN extends Writableextends RecordReader<KEYINVALUEIN{
  31.  
  32.     private static final Log LOG = LogFactory.getLog(CacheLoaderRecordReader.class);
  33.     private static final String CACHE_LOCATIONS_PATH = "/hadoop_cache_";
  34.     private static final int CACHE_EXTRA_SIZE_LENGTH = 1024;
  35.  
  36.     private RecordReader<KEYINVALUEINdelegateRecordReader = null;
  37.     private SharedMemory sm;
  38.     private AdjustableDataOutput preliminaryOutpt;
  39.     private DataOutput cacheOutput;
  40.     private int recordsStored;
  41.     private KEYIN curKey = null;
  42.     private VALUEIN curValue = null;
  43.     private InputSplit split;
  44.     private CacheInputFormatId splitId;
  45.     private String cacheLocation;
  46.     
  47.     private String keyClassName;
  48.     private String valueClassName;
  49.     
  50.     
  51.     public void setDelegateRecordReader(RecordReader<KEYINVALUEINdelegateRecordReader) {
  52.         this.delegateRecordReader = delegateRecordReader;
  53.     }
  54.     
  55.     @Override
  56.     public void close() throws IOException {
  57.         this.delegateRecordReader.close();
  58.  
  59.         // If no records are stored, just enter a dummy entry in the cache table
  60.         if (recordsStored == 0) {
  61.             CacheDataEntry entry = new CacheDataEntry(cacheLocation0recordsStored"""");
  62.  
  63.             // Add the entry to the node's cache manager
  64.             NodeCacheManager.INSTANCE.add(splitIdentry);
  65.  
  66.         } else {
  67.             LOG.info("Preliminary read " + recordsStored + " records - going to write them to shared memory");
  68.             
  69.             // Write the data from the adjustable data output to the cache data output
  70.             try {
  71.                 sm = new SharedMemory(cacheLocationpreliminaryOutpt.getSize()true);
  72.             } catch (SMException e) {
  73.                 throw new RuntimeException(e);
  74.             }
  75.     
  76.             // Lock the cache for write
  77.             try {
  78.                 sm.lockWrite();
  79.             } catch (SMException e) {
  80.                 throw new RuntimeException(e);
  81.             }
  82.             
  83.             cacheOutput = new CacheDataOutput(sm);
  84.             preliminaryOutpt.writeToOtherOutput(cacheOutput);
  85.     
  86.             // Unlock the cache
  87.             try {
  88.                 sm.unlockWrite();
  89.             } catch (SMException e) {
  90.                 throw new RuntimeException(e);
  91.             }
  92.     
  93.             // create a new cache table entry (with the size)
  94.             System.out.println("Creating a new CacheDataEntry object (cacheLocation:" + cacheLocation + ", size:" + 
  95.                     preliminaryOutpt.getSize() + ", recordsStored:" + recordsStored + ", keyClassName:" + keyClassName + ", valueClassName:" + valueClassName);
  96.             CacheDataEntry entry = new CacheDataEntry(cacheLocationpreliminaryOutpt.getSize()recordsStoredkeyClassNamevalueClassName);
  97.     
  98.             // Add the entry to the node's cache manager
  99.             NodeCacheManager.INSTANCE.add(splitIdentry);
  100.         }
  101.     }
  102.  
  103.     @Override
  104.     public KEYIN getCurrentKey() throws IOExceptionInterruptedException {
  105.         return curKey;
  106.     }
  107.  
  108.     @Override
  109.     public VALUEIN getCurrentValue() throws IOExceptionInterruptedException {
  110.         return curValue;
  111.     }
  112.  
  113.     @Override
  114.     public float getProgress() throws IOExceptionInterruptedException {
  115.         return this.delegateRecordReader.getProgress();
  116.     }
  117.  
  118.     @Override
  119.     public void initialize(InputSplit splitTaskAttemptContext context) throws IOExceptionInterruptedException {
  120.         this.delegateRecordReader.initialize(splitcontext);
  121.         this.split = split;
  122.         Configuration conf = context.getConfiguration();
  123.         String cacheId = conf.get(CacheInputFormat.DELEGATE_INPUT_FORMAT_ID);
  124.         this.splitId = new CacheInputFormatId(splitcacheId);
  125.  
  126.         cacheLocation = CACHE_LOCATIONS_PATH + split.hashCode();
  127.         LOG.info("cacheLocation: " + cacheLocation);
  128.         preliminaryOutpt = new AdjustableDataOutput();
  129.         recordsStored = 0;
  130.  
  131.     }
  132.  
  133.     @Override
  134.     public boolean nextKeyValue() throws IOExceptionInterruptedException {
  135.         boolean delegateNext = this.delegateRecordReader.nextKeyValue();
  136.         /* If there is something to read, we need to remember to store it in the cache */
  137.         if (delegateNext) {
  138.             curKey = this.delegateRecordReader.getCurrentKey();
  139.             curValue = this.delegateRecordReader.getCurrentValue();
  140.             
  141.             // Read the data into an adjustable data output
  142.             
  143.             curKey.write(preliminaryOutpt);
  144.             curValue.write(preliminaryOutpt);
  145.             if ((keyClassName == null) && (curKey != null))
  146.                 keyClassName = curKey.getClass().getCanonicalName();
  147.             if ((valueClassName == null) && (curValue != null))
  148.                 valueClassName = curValue.getClass().getCanonicalName();
  149.             recordsStored++;
  150.         }
  151.         return delegateNext;
  152.     }
  153.  
  154.     /**
  155.      * Retrieves the number of records read by this record reader.
  156.      * @return the number of records that were read by this record reader.
  157.      */
  158.     public int getRecordsRead() {
  159.         return recordsStored;
  160.     }
  161. }

  1. package cache.io;
  2.  
  3. import java.io.DataInput;
  4. import java.io.IOException;
  5.  
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.io.Writable;
  8. import org.apache.hadoop.io.WritableName;
  9. import org.apache.hadoop.io.serializer.Deserializer;
  10. import org.apache.hadoop.io.serializer.SerializationFactory;
  11. import org.apache.hadoop.mapreduce.InputSplit;
  12. import org.apache.hadoop.mapreduce.RecordReader;
  13. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  14. import org.apache.hadoop.util.ReflectionUtils;
  15.  
  16. import util.sharedmemory.SMException;
  17. import util.sharedmemory.SharedMemory;
  18.  
  19. import cache.CacheBufferedDataInput;
  20. import cache.CacheDataEntry;
  21. import cache.CacheDataInput;
  22. import cache.CacheInputFormatId;
  23. import cache.NodeCacheManager;
  24.  
  25. /**
  26.  * A class that reads records from the cache.
  27.  * 
  28.  * @author adi
  29.  *
  30.  * @param <KEYIN>
  31.  * @param <VALUEIN>
  32.  */
  33. public class CacheRecordReader<KEYIN extends WritableVALUEIN extends Writableextends RecordReader<KEYINVALUEIN{
  34.     
  35.     //private RecordReader<KEYIN, VALUEIN> delegateRecordReader = null;
  36.     private Configuration conf = null;
  37.     private SharedMemory sm;
  38.     private DataInput cacheInput;
  39.     private int recordsRead;
  40.     private int recordsNum;
  41.     private KEYIN curKey;
  42.     private VALUEIN curValue;
  43.  
  44.     /*
  45.     public void setDelegateRecordReader(RecordReader<KEYIN, VALUEIN> delegateRecordReader) {
  46.         this.delegateRecordReader = delegateRecordReader;
  47.     }
  48.     */
  49.     
  50.     private KEYIN createKey(String keyClassName) {
  51.         Class keyClass;
  52.         try {
  53.             keyClass = conf.getClassByName(keyClassName);
  54.         } catch (ClassNotFoundException e) {
  55.             throw new RuntimeException(e);
  56.         }
  57.         return (KEYIN) ReflectionUtils.newInstance(keyClassconf);
  58.     }
  59.  
  60.     public VALUEIN createValue(String valueClassName) {
  61.         Class valClass;
  62.         try {
  63.             valClass = conf.getClassByName(valueClassName);
  64.         } catch (ClassNotFoundException e) {
  65.             throw new RuntimeException(e);
  66.         }
  67.         return (VALUEIN) ReflectionUtils.newInstance(valClassconf);
  68.     }
  69.  
  70.     @Override
  71.     public void close() throws IOException {
  72.         // unlock the cache (TODO - use a rw-lock instead)
  73.         //sm.unlock();
  74.         //this.delegateRecordReader.close();
  75.     }
  76.  
  77.     @Override
  78.     public KEYIN getCurrentKey() throws IOExceptionInterruptedException {
  79.         return curKey;
  80.     }
  81.  
  82.     @Override
  83.     public VALUEIN getCurrentValue() throws IOExceptionInterruptedException {
  84.         return curValue;
  85.     }
  86.  
  87.     @Override
  88.     public float getProgress() throws IOExceptionInterruptedException {
  89.         if (recordsNum == 0)
  90.             return 1;
  91.         return ((float)recordsRead / recordsNum);
  92.     }
  93.  
  94.     @Override
  95.     public void initialize(InputSplit splitTaskAttemptContext context) throws IOExceptionInterruptedException {
  96.         //this.delegateRecordReader.initialize(split, context);
  97.         this.conf = context.getConfiguration();
  98.         String cacheId = conf.get(CacheInputFormat.DELEGATE_INPUT_FORMAT_ID);
  99.         CacheInputFormatId splitId = new CacheInputFormatId(splitcacheId);
  100.  
  101.         CacheDataEntry entry = NodeCacheManager.INSTANCE.get(splitId);
  102.         
  103.         recordsRead = 0;
  104.         this.recordsNum = entry.getNumOfRecords().get();
  105.         
  106.         if (recordsNum > 0) {
  107.             try {
  108.                 sm = new SharedMemory(entry.getLocation().toString()entry.getSize().get()false);
  109.             } catch (SMException e) {
  110.                 throw new RuntimeException(e);
  111.             }
  112.             cacheInput = new CacheBufferedDataInput(sm);
  113.             
  114.             
  115.             // instantiate the key and value (so we can read data from the cache into them)
  116.             
  117.             //String keyClassName = conf.get("mapred.input.key.class");
  118.             //String valueClassName = conf.get("mapred.input.value.class");
  119.             
  120.             curKey = createKey(entry.getKeyClassName());
  121.             curValue = createValue(entry.getValueClassName());
  122.             // lock the cache (TODO - us a read-lock instead of a lock)
  123.             //sm.lock();
  124.         }
  125.     }
  126.  
  127.     @Override
  128.     public boolean nextKeyValue() throws IOExceptionInterruptedException {
  129.         boolean hasNext = (recordsNum - recordsRead) > 0;
  130.         if (hasNext) {
  131.             curKey.readFields(cacheInput);
  132.             curValue.readFields(cacheInput);
  133.             recordsRead++;
  134.         }
  135.         return hasNext;
  136.     }
  137.  
  138. }