In my previous post I had described my initial attempts at working with Hadoop, an implementation of the map/reduce framework. Most Hadoop examples are based on line oriented input files. In the cheminformatics domain, SMILES files are line oriented and so applying Hadoop to a variety of tasks that work with SMILES input is easy. However, a number of problems require 3D coordinates or meta data and SMILES cannot support this. Instead, we consider SD files, where each molecule is a multi-line record. So to make use of Hadoop, we’re going to need to support multi-line records.
Handling multi-line records
The basic idea is to create a class that will accumulate text from an SD file until it reaches the $$$$ marker, indicating the end of a molecule. The resultant text is then sent to the mapper as a string. The mapper can then use the CDK to parse the string representation of the SD record into an IAtomContainer object and then carry on from there.
So how do we generate multi-line records for the mapper? First, we extend TextInputFormat so that our class returns an appropriate RecordReader.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | package net.rguha.dc.io; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class SDFInputFormat extends TextInputFormat { @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) { return new SDFRecordReader(); } } |
The SDFRecordReader class is where all the work is done. We start of by setting some variables
1 2 3 4 5 6 7 8 9 10 11 | public class SDFRecordReader extends RecordReader<LongWritable, Text> { private long end; private boolean stillInChunk = true; private LongWritable key = new LongWritable(); private Text value = new Text(); private FSDataInputStream fsin; private DataOutputBuffer buffer = new DataOutputBuffer(); private byte[] endTag = "$$$$\n".getBytes(); |
The main method in this class is nextKeyValue(). The code in this class simply reads bytes from the input until it hits the end molecule marker ($$$$) and then sets the value to the resultant string and the key to the position in the file. At this point, it doesn’t really matter what we use for the key, since the mapper will usually work with a molecule identifier and some calculated property. As a result, the reducer will likely not get to the see the key generated in this method.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | public boolean nextKeyValue() throws IOException { if (!stillInChunk) return false; boolean status = readUntilMatch(endTag, true); value = new Text(); value.set(buffer.getData(), 0, buffer.getLength()); key = new LongWritable(fsin.getPos()); buffer.reset(); // status is true as long as we're still within the // chunk we got (i.e., fsin.getPos() < end). If we've // read beyond the chunk it will be false if (!status) { stillInChunk = false; } return true; } |
The remaining methods are pretty self-explanatory and I’ve included the entire class below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 | package net.rguha.dc.io; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class SDFRecordReader extends RecordReader<LongWritable, Text> { private long end; private boolean stillInChunk = true; private LongWritable key = new LongWritable(); private Text value = new Text(); private FSDataInputStream fsin; private DataOutputBuffer buffer = new DataOutputBuffer(); private byte[] endTag = "$$$$\n".getBytes(); public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { FileSplit split = (FileSplit) inputSplit; Configuration conf = taskAttemptContext.getConfiguration(); Path path = split.getPath(); FileSystem fs = path.getFileSystem(conf); fsin = fs.open(path); long start = split.getStart(); end = split.getStart() + split.getLength(); fsin.seek(start); if (start != 0) { readUntilMatch(endTag, false); } } public boolean nextKeyValue() throws IOException { if (!stillInChunk) return false; boolean status = readUntilMatch(endTag, true); value = new Text(); value.set(buffer.getData(), 0, buffer.getLength()); key = new LongWritable(fsin.getPos()); buffer.reset(); if (!status) { stillInChunk = false; } return true; } public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } public Text getCurrentValue() throws IOException, InterruptedException { return value; } public float getProgress() throws IOException, InterruptedException { return 0; } public void close() throws IOException { fsin.close(); } private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { int i = 0; while (true) { int b = fsin.read(); if (b == -1) return false; if (withinBlock) buffer.write(b); if (b == match[i]) { i++; if (i >= match.length) { return fsin.getPos() < end; } } else i = 0; } } } |
Using multi-line records
Now that we have classes to handle multi-line records, using them is pretty easy. For example, lets rework the atom counting example, to work with SD file input. In this case, the value argument of the map() method will be a String containing the SD record. We simply parse this using the MDLV2000Reader and then proceed as before.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text haCount = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { try { StringReader sreader = new StringReader(value.toString()); MDLV2000Reader reader = new MDLV2000Reader(sreader); ChemFile chemFile = (ChemFile)reader.read((ChemObject)new ChemFile()); List<IAtomContainer> containersList = ChemFileManipulator.getAllAtomContainers(chemFile); IAtomContainer molecule = containersList.get(0); for (IAtom atom : molecule.atoms()) { haCount.set(atom.getSymbol()); context.write(haCount, one); } } catch (CDKException e) { e.printStackTrace(); } } } |
Before running it, we need to tell Hadoop to use our SDFInputFormat class and this is done in the main program driver by
1 2 3 4 | Configuration conf = new Configuration() Job job = new Job(conf, "haCount count"); job.setInputFormatClass(SDFInputFormat.class); ... |
After regenerating the jar file we’re ready to run. Our SD file contains 100 molecules taken from Pub3D and we copy it into our local HDFS
1 | hadoop dfs -copyFromLocal ~/Downloads/small.sdf input.sdf |
and then run our program
1 | hadoop jar rghadoop.jar input.sdf output.sdf |
Looking at the output,
1 2 3 4 5 6 7 8 9 10 | $ hadoop dfs -cat output.sdf/part-r-00000 Br 40 C 2683 Cl 23 F 15 H 2379 N 300 O 370 P 1 S 82 |
we get a count of the elements across the 100 molecules.
With the SDFInputFormat and SDFRecordReader classes, we are now in a position to throw pretty much any cheminformatics task onto a Hadoop cluster. The next post will look at doing SMARTS based substructure searches using this framework. Future posts will consider the performance gains (or not) when using this approach.