Nice Article on Open Source Cheminformatics

I met Viven Marx at the BioIT World conference held in Boston earlier this month, in which I spoke on the topic of Open Source cheminformatics. The result of conversations between myself (and Peter Murray Rust) and here were incorporated into an interesting article. (Though the CDK was started at Notre Dame, but is now an internationally developed project).

The Move is Complete

The move to Rockville is complete and I’ve started work at the NCGC. This week has been a bit hectic what with setting up house and getting up to speed at work. But I’m really excited with the stuff that’s going on here – lots of interesting projects in various areas of chemical biology and genomics. Also, being surrounded by some really smart people is invigorating (and leads to some nice lunch discussions). One aspect of the center that I really like is the commitment to Open Source – while there isn’t a whole lot at the moment, there’s some cool stuff coming down the pipeline. And a 30″ Apple Cinema display is sweet.

For now, back to reading up on HTS, RNAi and browsing for sofas.

Cheminformatics with Hadoop and EC2

In the last few posts I’ve described how I’ve gotten up to speed on developing Map/Reduce applications using the Hadoop framework. The nice thing is that I can set it all up and test it out on my laptop and then easily migrate the application to a large production cluster. Over the past few days I converted my pharmacophore searching code into a Hadoop application. After doing some searches on small collections, it was time to test it out on a real cluster.

Cluster Setup

The cluster I used was Amazon EC2. Before running Hadoop applications, you need to set up your local environment for EC2, following excellent instructions. With this in place, you’ll need to setup a Hadoop cluster. While one can do it using tools from the Hadoop sources, Cloudera provides a very easy set of setup scripts. Instructions are given here. With the Cloudera scripts installed, you can set up a 10 node cluster by doing (from your local machine)

1
hadoop-ec2 launch-cluster rgc 10

where rgc is the name of your cluster. There’ll be some output and it will provide you with the hostname of the master node of your cluster (to which you can ssh and start jobs). You can also visit http://hostname:50030 to track the progress of Hadoop jobs. By default this process will use c1.medium EC2 instances, though you can change this in the set up scripts. Also note that each node will run 2 map tasks – this will be useful later on.

Finally, when you’re done with the cluster remember to terminate it! Otherwise you’re going to rack up bills.

Data Setup

So the cluster is ready to run jobs. But we need data for it. A simple approach is to use scp to copy data files onto the master node and then copy the data files to the HDFS on your EC2 cluster. This not a good idea for any real sized dataset, as you will loose all the data once the cluster is terminated. A better idea is to load the input data in S3. I use S3Fox, a Firefox extension, to load data from my laptop into S3. Once you have the data file in a S3 bucket, you can access it on an EC2 node using the following notation

1
s3n://AWS-ACCESS-ID:AWS-SECRET-ID@bucket/filename

For my particular set up, I obtained 136K structures from Pub3D as a single SD file and uploaded it into an S3 bucket. However, I used scp to copy my Hadoop program jar file and the pharmacophore definition file directly onto the master node, as they were relatively small. I should note that for this run, the 136K structures were only about 560MB – tiny compared to what one would usually use Hadoop for.

Program Setup

While developing the Hadoop program I had started using Hadoop 0.20.0. But Amazon only supports version 0.18.3. So some refactoring was required. The only other thing that I had to modify in my program was to add the statement

1
conf.set("mapred.map.tasks", "20");

to indicate that the application should try and use up to 20 map tasks. While this is usually taken as a suggestion by Hadoop, my experiment indicated that without this it would only run two map tasks (and hence 1 node) rather than say 20 map tasks for 10 nodes. This is due to the way the input file is split – the default is to create 128MB splits, thus requiring about 4 map tasks (since each split goes to a single mapper). By specifying we want 20 map tasks, we can ‘force’ the use of multiple nodes. At this point, I’m not entire clear as to why I need to force it this way. My understanding is that this is not required when dealing with multi-gigabyte input files.

In preparation for the run, I compiled all the classes and created a single jar file containing the CDK as well my own application classes. This avoids having to fiddle with classpaths on the Hadoop cluster. You can get the sources from my GitHib repository (the v18 branch is the one for running on Amazon).

Runs

With our cluster, data and program all set up we can set of a run. With my input data on S3, I logged into my master node on EC2 and the run was started with

1
hadoop jar rghadoop.jar s3n://AWS-ACCESS-ID:AWS-SECRET-ID@pcore/input.sdf output cns2.xml

While this runs you can view the job progress via http://hostname:50030 (hostname being whatever the cluster setup process provided). My initial run used a 4 node cluster and took 6 min 35 sec. However it was simple to terminate this cluster and restart one with 10 nodes. On the new cluster the run time dropped to 3 min 33 sec to process 136K structures. For comparison, running the same command, using 2 map tasks, took about 20 minutes on my MacBook Pro (2.4 GHz, 4GB RAM).

Cost Issues

So how much did this experiment cost? While I don’t have the exact numbers, the actual processing on the 4-node cluster cost $0.80 – four c2.medium instances at $0.20 / hour (since anytime less than an hour is still billed as an hour). Clearly, the 10-node cluster cost $2.00 – but while the result was obtained faster, we could have simply stayed with the 4-node cluster and saved half the price. Of course, the actual price will be a little higher since it took some time to upload the application and start the jobs. Another cost was S3 storage. Currently I’m using less than 1GB and when band width costs are taken into account this is about $0.25. But less than $5 is not too bad. There’s also a handy application to estimate costs associated with various Amazon services.

Conclusions

While this experiment didn’t actually highlight new algorithms or methods, it does highlight the ease with which data intensive computation can be handled. What was really cool for me was that I have access to massive compute resources, accessible with a few simple command line invocations. Another nice thing is that the Hadoop framework, makes handling large data problems pretty much trivial – as opposed to chunking my data by hand, making sure each chunk is processed by a different node and all the scheduling issues associated with this.

The next thing to look at is how one can access the Amazon public datasets stored on EBS from a Hadoop cluster. This will allow pharmacophore searching for the entire PubChem collection  – either via the Pub3D dataset (single conformer) or else via the PubChem dataset (multiple conformers). While I’ve focused on pharmacophore searching, one can consider arbitrary cheminformatics tasks.

Going further, one could consider the use of HBase, a column store based on Hadoop, as a storage system for large chemical collections and couple it to Hadoop applications. This will be useful, if the use case does not involve complex relational queries. Going back to pharmacophore searches, one could imagine running searches against large collections stored in HBase, and updating the database with the results – in this case, database usage is essentially simple lookups based on compound ID, as opposed to relational queries.

Finally, it’d also be useful to try and consider cheminformatics applications that could make use of the Map/Reduce framework at an algorithmic level, as opposed to Map/Reduce to simply processe data in chunks. Some immediate applications that come to mind include pharmacophore discovery and diversity analysis.

Hadoop, Chunks and Multi-line Records

Chunking an input file

Chunking an input file

In a previous post I described how one requires a custom RecordReader class to deal with multi-line records  (such as SD files) in a Hadoop program. While it worked fine on a small input file (less than 5MB) I had not addressed the issue of “chunking” and that caused it to fail when dealing with larger files (the code in that post is updated now).

When a Hadoop program is run on an input file, the framework will send chunks of the input file to individual RecordReader instances. Note that it doesn’t actually read the entire file and send around portions of it – that would not scale to petabyte files! Rather, it determines the size of the file and ends start and end offsets into the original file, to the RecordReaders. They then seek to the appropriate position in the original file and then do their work.

The problem with this is that when a RecordReader receives a chunk (defined in terms of start and offsets), it can start in the middle of a record and end in the middle of another record. This shown schematically in the figure, where the input file with 5 multi-line, variable length records is divided into 5 chunks. As you can see, in the general case, chunks don’t start or end on record boundaries.

My initial code, when faced with chunks failed badly since rather than recognizing chunk boundaries it simply read each record in the whole file. Alternatively (and naively) if one simply reads up to a chunk boundary, the last and first records read from that chunk will generally be invalid.

The correct (and simple) strategy for an arbitrary chunk, is to make sure that the start position is not 0. If so, we read the bytes from the start position until we reach the first end of record marker. In general, the record we just read will be incomplete, so we discard it. We then carry on reading complete records as usual. But if, after reading a record, we note that the current file position is beyond the end position of the current chunk, we note that the chunk is done with and just return this last record. Thus, according to the figure, when processing he second chunk from the top, we read in bytes 101 to 120 and discard that data. We then start reading the initial portion of Record 3 until the end of the record, at position 250 – even though we’ve gone beyond the chunk boundary at position 200. However we now flag that we’re done with the chunk and carry on.

When another RecordReader class gets the next chunk starting at position 200, it will be dumped into the middle of Record 3. But, according to our strategy, we simply read till the end of record marker at position 250 and discard the data. This is OK, since the RecordReader instance that handled the previous chunk already read the whole of Record 3.

The two edge cases here are when the chunk starts at position 0 (beginning of the input file) and the chunk ends at the end of file. In the former case, we don’t discard anything, but simply process the entire chunk plus a bit beyond it to get the entire last record for this chunk. For the latter case, we simply check whether we’re at the end of the file and flag it to the nextKeyValue() method.

The implementation of this strategy is shown in the SDFRecordReader class listing.

In hindsight this is pretty obvious, but I was bashing myself for a while and hopefully this explanation saves others some effort.

Substructure Searching with Hadoop

My last two posts have described recent attempts at working with Hadoop, a map/reduce framework. As I noted, Hadoop for cheminformatics is quite trivial when working with SMILES files, which is line oriented but requires a bit more work when dealing with multi-line records such as in SD files. But now that we have a SDFInputFormat class that handles SD files we’re ready to throw a variety of tasks at it. In this post I describe a class to perform SMARTS substructure searching using the CDK and Hadoop.

Doing substructure searching in a map/reduce framework is conceptually quite simple. In the map step, the mapper gets a record (in this case a single entry from the SD file) and performs the match using the supplied SMARTS pattern. It emits a key/value pair of the form “molid 1” if the molecule matched the pattern, otherwise “molid 0” if it did not. In either case, molid is some identifier for the given molecule.

The reduce step simply examines each key/value pair it receives, and discards those with values equal to 0. The resultant output will contain the ID’s (in this case molecule titles, since we’re reading from SD files) of the files that matched the supplied pattern.

The source for this class is given below

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package net.rguha.dc;

import net.rguha.dc.io.SDFInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.openscience.cdk.CDKConstants;
import org.openscience.cdk.ChemFile;
import org.openscience.cdk.ChemObject;
import org.openscience.cdk.exception.CDKException;
import org.openscience.cdk.interfaces.IAtomContainer;
import org.openscience.cdk.io.MDLV2000Reader;
import org.openscience.cdk.smiles.smarts.SMARTSQueryTool;
import org.openscience.cdk.tools.manipulator.ChemFileManipulator;

import java.io.IOException;
import java.io.StringReader;
import java.util.List;

public class SubSearch {
    static SMARTSQueryTool sqt;static {
        try {
            sqt = new SMARTSQueryTool("C");
        } catch (CDKException e) {            
        }
    }

    private final static IntWritable one = new IntWritable(1);
    private final static IntWritable zero = new IntWritable(0);

    public static class MoleculeMapper extends Mapper<Object, Text, Text, IntWritable> {

        private Text matches = new Text();
        private String pattern;

        public void setup(Context context) {
            pattern = context.getConfiguration().get("net.rguha.dc.data.pattern");
        }

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            try {
                StringReader sreader = new StringReader(value.toString());
                MDLV2000Reader reader = new MDLV2000Reader(sreader);
                ChemFile chemFile = (ChemFile) reader.read((ChemObject) new ChemFile());
                List<IAtomContainer> containersList = ChemFileManipulator.getAllAtomContainers(chemFile);
                IAtomContainer molecule = containersList.get(0);

                sqt.setSmarts(pattern);
                boolean matched = sqt.matches(molecule);
                matches.set((String) molecule.getProperty(CDKConstants.TITLE));
                if (matched) context.write(matches, one);
                else context.write(matches, zero);
            } catch (CDKException e) {
                e.printStackTrace();
            }
        }
    }

    public static class SMARTSMatchReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context) throws IOException, InterruptedException {
            for (IntWritable val : values) {
                if (val.compareTo(one) == 0) {
                    result.set(1);
                    context.write(key, result);
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 3) {
            System.err.println("Usage: subsearch <in> <out> <pattern>");
            System.exit(2);
        }

        // need to set it before we create the Job object
        conf.set("net.rguha.dc.data.pattern", otherArgs[2]);
       
        Job job = new Job(conf, "id 1");
        job.setJarByClass(SubSearch.class);
        job.setMapperClass(MoleculeMapper.class);
        job.setCombinerClass(SMARTSMatchReducer.class);
        job.setReducerClass(SMARTSMatchReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(SDFInputFormat.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

The map method in MoleculeMapper does the job of performing the SMARTS matching.  If the molecule matches, it writes out the molecule title and the value 1. The reduce method in SMARTSMatchReducer simple examines each key/value and writes out those keys whose value equals 1.

Another important thing to note is that when we pass in the SMARTS pattern as a command line parameter, it doesn’t automatically become available to the mappers since they will, in general, be run on different nodes that the one you started the program. So naively storing a command line argument in a variable in main will result in a NPE when you run the program. Instead, we read in the argument and set it as a value for a (arbitrary) key in the Configuration object (line 90). The object can then be accessed via the Context object in the mapper class (lines 43-45), wherever the mapper is being run.

We compile this to a jar file, and then run it on a 100-molecule SD file from Pub3D:

1
2
$ hadoop dfs -copyFromLocal ~/Downloads/small.sdf input.sdf
$ hadoop jar rghadoop.jar input.sdf output.sdf "[R2]"

The output is of the form

1
2
3
4
5
6
7
8
9
10
11
12
$ hadoop dfs -cat output.sdf/part-r-00000
120059  1
20060138    1
20060139    1
20060140    1
20060141    1
20060146    1
3803680 1
3803685 1
3803687 1
3803694 1
...

where each line lists the PubChem CID of the molecules that matched (27 in this case).

Postscript

While I’ve been working on these examples with relatively small inputs on my own laptop, it’d be useful to test them out with larger datasets on a real multi-node Hadoop cluster. If anybody has such a setup (using 0.20.0 of Hadoop), I’d love to try these examples out. I’ll provide a single jar file and the large datasets.