# So much to do, so little time

Trying to squeeze sense out of chemical data

## Pig and Cheminformatics

Pig is a platform for analyzing large datasets. At its core is a high level language (called Pig Latin), that is focused on specifying a series of data transformations. Scripts written in Pig Latin are executed by the Pig infrastructure either in local or map/reduce modes (the latter making use of Hadoop).

Previously I had investigated Hadoop for running cheminformatics tasks such as SMARTS matching and pharmacophore searching. While the implementation of such code is pretty straightforward, it’s still pretty heavyweight compared to say, performing SMARTS matching in a database via SQL. On the other hand, being able to perform these tasks in Pig Latin, lets us write much simpler code that can be integrated with other non-cheminformatics code in a flexible manner. An example of Pig Latin script that we might want to execute is:

 123 A = load 'medium.smi' as (smiles:chararray); B = filter A by net.rguha.dc.pig.SMATCH(smiles, 'NC(=O)C(=O)N'); store B into 'output.txt';

The script loads a file containing SMILES strings and then filters entries that match the specified SMARTS pattern and writes out the matching SMILES to an output file. Clearly, very similar to SQL. However, the above won’t work on a default Pig installation since SMATCH is not a builtin function. Instead we need to look at  a user defined function (UDF).

UDF’s are implemented in Java and can be classified into one of three types: eval, aggregate or filter functions. For this example I’ll consider a filter function that takes two strings representing a SMILES string and a SMARTS string and returns true if the SMILES contains the specified pattern.

 1234567891011121314151617181920212223 public class SMATCH extends FilterFunc {     static SMARTSQueryTool sqt;static {         try {             sqt = new SMARTSQueryTool("C");         } catch (CDKException e) {             System.out.println(e);         }     }     static SmilesParser sp = new SmilesParser(DefaultChemObjectBuilder.getInstance());     public Boolean exec(Tuple tuple) throws IOException {         if (tuple == null || tuple.size() < 2) return false;         String target = (String) tuple.get(0);         String query = (String) tuple.get(1);         try {             sqt.setSmarts(query);             IAtomContainer mol = sp.parseSmiles(target);             return sqt.matches(mol);         } catch (CDKException e) {             throw WrappedIOException.wrap("Error in SMARTS pattern or SMILES string "+query, e);         }     } }

A UDF for filtering must implement the FilterFunc interface which specifies a single method, exec. Within this method, we check whether we have the requisite number of input arguments and if so, simply return the value of the SMARTS match. For more details on filter functions see the UDF manual.

One of the key features of the code is the static initialization of the SMILES parser and SMARTS matcher. I’m not entirely sure how many times the UDF is instantiated during a query (once for each “row”? Once for the entire query?) – but if it’s more than once, we don’t want to instantiate the parser and matcher in the exec function. Note that since Hadoop is not a multithreaded model, we don’t need to worry about the lack of thread safety in the CDK.

Compiling the above class and packaging it into a jar file, allows us to run the above Pig Latin script (you’ll have to register the jar file at the beginning by writing register /path/to/myudf.jar) from the command line:

### Conclusions

While this experiment didn’t actually highlight new algorithms or methods, it does highlight the ease with which data intensive computation can be handled. What was really cool for me was that I have access to massive compute resources, accessible with a few simple command line invocations. Another nice thing is that the Hadoop framework, makes handling large data problems pretty much trivial – as opposed to chunking my data by hand, making sure each chunk is processed by a different node and all the scheduling issues associated with this.

The next thing to look at is how one can access the Amazon public datasets stored on EBS from a Hadoop cluster. This will allow pharmacophore searching for the entire PubChem collection  – either via the Pub3D dataset (single conformer) or else via the PubChem dataset (multiple conformers). While I’ve focused on pharmacophore searching, one can consider arbitrary cheminformatics tasks.

Going further, one could consider the use of HBase, a column store based on Hadoop, as a storage system for large chemical collections and couple it to Hadoop applications. This will be useful, if the use case does not involve complex relational queries. Going back to pharmacophore searches, one could imagine running searches against large collections stored in HBase, and updating the database with the results – in this case, database usage is essentially simple lookups based on compound ID, as opposed to relational queries.

Finally, it’d also be useful to try and consider cheminformatics applications that could make use of the Map/Reduce framework at an algorithmic level, as opposed to Map/Reduce to simply processe data in chunks. Some immediate applications that come to mind include pharmacophore discovery and diversity analysis.

Written by Rajarshi Guha

May 12th, 2009 at 2:22 am

## Hadoop, Chunks and Multi-line Records

Chunking an input file

In a previous post I described how one requires a custom RecordReader class to deal with multi-line records  (such as SD files) in a Hadoop program. While it worked fine on a small input file (less than 5MB) I had not addressed the issue of “chunking” and that caused it to fail when dealing with larger files (the code in that post is updated now).

When a Hadoop program is run on an input file, the framework will send chunks of the input file to individual RecordReader instances. Note that it doesn’t actually read the entire file and send around portions of it – that would not scale to petabyte files! Rather, it determines the size of the file and ends start and end offsets into the original file, to the RecordReaders. They then seek to the appropriate position in the original file and then do their work.

The problem with this is that when a RecordReader receives a chunk (defined in terms of start and offsets), it can start in the middle of a record and end in the middle of another record. This shown schematically in the figure, where the input file with 5 multi-line, variable length records is divided into 5 chunks. As you can see, in the general case, chunks don’t start or end on record boundaries.

My initial code, when faced with chunks failed badly since rather than recognizing chunk boundaries it simply read each record in the whole file. Alternatively (and naively) if one simply reads up to a chunk boundary, the last and first records read from that chunk will generally be invalid.

The correct (and simple) strategy for an arbitrary chunk, is to make sure that the start position is not 0. If so, we read the bytes from the start position until we reach the first end of record marker. In general, the record we just read will be incomplete, so we discard it. We then carry on reading complete records as usual. But if, after reading a record, we note that the current file position is beyond the end position of the current chunk, we note that the chunk is done with and just return this last record. Thus, according to the figure, when processing he second chunk from the top, we read in bytes 101 to 120 and discard that data. We then start reading the initial portion of Record 3 until the end of the record, at position 250 – even though we’ve gone beyond the chunk boundary at position 200. However we now flag that we’re done with the chunk and carry on.

When another RecordReader class gets the next chunk starting at position 200, it will be dumped into the middle of Record 3. But, according to our strategy, we simply read till the end of record marker at position 250 and discard the data. This is OK, since the RecordReader instance that handled the previous chunk already read the whole of Record 3.

The two edge cases here are when the chunk starts at position 0 (beginning of the input file) and the chunk ends at the end of file. In the former case, we don’t discard anything, but simply process the entire chunk plus a bit beyond it to get the entire last record for this chunk. For the latter case, we simply check whether we’re at the end of the file and flag it to the nextKeyValue() method.

The implementation of this strategy is shown in the SDFRecordReader class listing.

In hindsight this is pretty obvious, but I was bashing myself for a while and hopefully this explanation saves others some effort.

Written by Rajarshi Guha

May 6th, 2009 at 5:00 am

Posted in Uncategorized,software

Tagged with ,

My last two posts have described recent attempts at working with Hadoop, a map/reduce framework. As I noted, Hadoop for cheminformatics is quite trivial when working with SMILES files, which is line oriented but requires a bit more work when dealing with multi-line records such as in SD files. But now that we have a SDFInputFormat class that handles SD files we’re ready to throw a variety of tasks at it. In this post I describe a class to perform SMARTS substructure searching using the CDK and Hadoop.

Doing substructure searching in a map/reduce framework is conceptually quite simple. In the map step, the mapper gets a record (in this case a single entry from the SD file) and performs the match using the supplied SMARTS pattern. It emits a key/value pair of the form “molid 1” if the molecule matched the pattern, otherwise “molid 0” if it did not. In either case, molid is some identifier for the given molecule.

The reduce step simply examines each key/value pair it receives, and discards those with values equal to 0. The resultant output will contain the ID’s (in this case molecule titles, since we’re reading from SD files) of the files that matched the supplied pattern.

The source for this class is given below

The map method in MoleculeMapper does the job of performing the SMARTS matching.  If the molecule matches, it writes out the molecule title and the value 1. The reduce method in SMARTSMatchReducer simple examines each key/value and writes out those keys whose value equals 1.

Another important thing to note is that when we pass in the SMARTS pattern as a command line parameter, it doesn’t automatically become available to the mappers since they will, in general, be run on different nodes that the one you started the program. So naively storing a command line argument in a variable in main will result in a NPE when you run the program. Instead, we read in the argument and set it as a value for a (arbitrary) key in the Configuration object (line 90). The object can then be accessed via the Context object in the mapper class (lines 43-45), wherever the mapper is being run.

We compile this to a jar file, and then run it on a 100-molecule SD file from Pub3D:

 12 $hadoop dfs -copyFromLocal ~/Downloads/small.sdf input.sdf$ hadoop jar rghadoop.jar input.sdf output.sdf "[R2]"

The output is of the form

 123456789101112 $hadoop dfs -cat output.sdf/part-r-00000 120059 1 20060138 1 20060139 1 20060140 1 20060141 1 20060146 1 3803680 1 3803685 1 3803687 1 3803694 1 ... where each line lists the PubChem CID of the molecules that matched (27 in this case). ### Postscript While I’ve been working on these examples with relatively small inputs on my own laptop, it’d be useful to test them out with larger datasets on a real multi-node Hadoop cluster. If anybody has such a setup (using 0.20.0 of Hadoop), I’d love to try these examples out. I’ll provide a single jar file and the large datasets. Written by Rajarshi Guha May 4th, 2009 at 9:24 pm ## Hadoop and SD Files with 9 comments In my previous post I had described my initial attempts at working with Hadoop, an implementation of the map/reduce framework. Most Hadoop examples are based on line oriented input files. In the cheminformatics domain, SMILES files are line oriented and so applying Hadoop to a variety of tasks that work with SMILES input is easy. However, a number of problems require 3D coordinates or meta data and SMILES cannot support this. Instead, we consider SD files, where each molecule is a multi-line record. So to make use of Hadoop, we’re going to need to support multi-line records. ### Handling multi-line records The basic idea is to create a class that will accumulate text from an SD file until it reaches the  marker, indicating the end of a molecule. The resultant text is then sent to the mapper as a string. The mapper can then use the CDK to parse the string representation of the SD record into an IAtomContainer object and then carry on from there. So how do we generate multi-line records for the mapper? First, we extend TextInputFormat so that our class returns an appropriate RecordReader.  12345678910111213141516 package net.rguha.dc.io; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class SDFInputFormat extends TextInputFormat { @Override public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) { return new SDFRecordReader(); } } The SDFRecordReader class is where all the work is done. We start of by setting some variables  1234567891011 public class SDFRecordReader extends RecordReader { private long end; private boolean stillInChunk = true; private LongWritable key = new LongWritable(); private Text value = new Text(); private FSDataInputStream fsin; private DataOutputBuffer buffer = new DataOutputBuffer(); private byte[] endTag = "\n".getBytes(); The main method in this class is nextKeyValue(). The code in this class simply reads bytes from the input until it hits the end molecule marker () and then sets the value to the resultant string and the key to the position in the file. At this point, it doesn’t really matter what we use for the key, since the mapper will usually work with a molecule identifier and some calculated property. As a result, the reducer will likely not get to the see the key generated in this method.  12345678910111213141516171819 public boolean nextKeyValue() throws IOException { if (!stillInChunk) return false; boolean status = readUntilMatch(endTag, true); value = new Text(); value.set(buffer.getData(), 0, buffer.getLength()); key = new LongWritable(fsin.getPos()); buffer.reset(); // status is true as long as we're still within the // chunk we got (i.e., fsin.getPos() < end). If we've // read beyond the chunk it will be false if (!status) { stillInChunk = false; } return true; } The remaining methods are pretty self-explanatory and I’ve included the entire class below.  1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 package net.rguha.dc.io; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class SDFRecordReader extends RecordReader { private long end; private boolean stillInChunk = true; private LongWritable key = new LongWritable(); private Text value = new Text(); private FSDataInputStream fsin; private DataOutputBuffer buffer = new DataOutputBuffer(); private byte[] endTag = "\n".getBytes(); public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { FileSplit split = (FileSplit) inputSplit; Configuration conf = taskAttemptContext.getConfiguration(); Path path = split.getPath(); FileSystem fs = path.getFileSystem(conf); fsin = fs.open(path); long start = split.getStart(); end = split.getStart() + split.getLength(); fsin.seek(start); if (start != 0) { readUntilMatch(endTag, false); } } public boolean nextKeyValue() throws IOException { if (!stillInChunk) return false; boolean status = readUntilMatch(endTag, true); value = new Text(); value.set(buffer.getData(), 0, buffer.getLength()); key = new LongWritable(fsin.getPos()); buffer.reset(); if (!status) { stillInChunk = false; } return true; } public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } public Text getCurrentValue() throws IOException, InterruptedException { return value; } public float getProgress() throws IOException, InterruptedException { return 0; } public void close() throws IOException { fsin.close(); } private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { int i = 0; while (true) { int b = fsin.read(); if (b == -1) return false; if (withinBlock) buffer.write(b); if (b == match[i]) { i++; if (i >= match.length) { return fsin.getPos() < end; } } else i = 0; } } } ### Using multi-line records Now that we have classes to handle multi-line records, using them is pretty easy. For example, lets rework the atom counting example, to work with SD file input. In this case, the value argument of the map() method will be a String containing the SD record. We simply parse this using the MDLV2000Reader and then proceed as before.  123456789101112131415161718192021 public static class TokenizerMapper extends Mapper { private final static IntWritable one = new IntWritable(1); private Text haCount = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { try { StringReader sreader = new StringReader(value.toString()); MDLV2000Reader reader = new MDLV2000Reader(sreader); ChemFile chemFile = (ChemFile)reader.read((ChemObject)new ChemFile()); List containersList = ChemFileManipulator.getAllAtomContainers(chemFile); IAtomContainer molecule = containersList.get(0); for (IAtom atom : molecule.atoms()) { haCount.set(atom.getSymbol()); context.write(haCount, one); } } catch (CDKException e) { e.printStackTrace(); } } } Before running it, we need to tell Hadoop to use our SDFInputFormat class and this is done in the main program driver by  1234 Configuration conf = new Configuration() Job job = new Job(conf, "haCount count"); job.setInputFormatClass(SDFInputFormat.class); ... After regenerating the jar file we’re ready to run. Our SD file contains 100 molecules taken from Pub3D and we copy it into our local HDFS  1 hadoop dfs -copyFromLocal ~/Downloads/small.sdf input.sdf and then run our program  1 hadoop jar rghadoop.jar input.sdf output.sdf Looking at the output,  12345678910$ hadoop dfs -cat output.sdf/part-r-00000 Br  40 C   2683 Cl  23 F   15 H   2379 N   300 O   370 P   1 S   82

we get a count of the elements across the 100 molecules.

With the SDFInputFormat and SDFRecordReader classes, we are now in a position to throw pretty much any cheminformatics task onto a Hadoop cluster. The next post will look at doing SMARTS based substructure searches using this framework. Future posts will consider the performance gains (or not) when using this approach.

Written by Rajarshi Guha

May 4th, 2009 at 8:32 pm