So much to do, so little time

Trying to squeeze sense out of chemical data

Archive for the ‘parallel’ tag

Similarity Matrices in Parallel

without comments

Today I got an email asking whether it’d be possible to speed up a fingerprint similarity matrix calculation in R. Now, pairwise similarity matrix calculations (whether they’re for molecules or sequences or anything else) are by definition quadratic in nature. So performing these calculations for large collections aren’t always feasible – in many cases, it’s worthwhile to rethink the problem.

But for those situations where you do need to evaluate it, a simple way to parallelize the calculation is to evaluate the similarity of each molecule with all the rest in parallel. This means each process/thread must have access to the entire set of fingerprints. So again, for very large collections, this is not always practical. However, for small collections parallel evaluation can lead to speed ups.

The fingerprint package provides a method to directly get the similarity matrix for a set of fingerprints, but this is implemented in interpreted R so is not very fast. Given a list of fingerprints, a manual evaluation of the similarity matrix can be done using nested lapply’s:

1
2
3
4
library(fingerprint)
sims <- lapply(fps, function(x) {
  unlist(lapply(fps, function(y) distance(x,y)))
})

For 1012 fingerprints, this takes 286s on my Macbook Pro (4GB, 2.4 GHz). Using snow, we can convert this to a parallel version, which takes 172s on two cores:

1
2
3
4
5
6
7
8
library(fingerprint)
library(snow)
cl <- makeCluster(4, type = "SOCK")
clusterEvalQ(cl, library(fingerprint))
clusterExport(cl, "fps")
sim <- parLapply(cl, fps, function(x) {
  unlist(lapply(fps, function(y) distance(x,y)))
})

Written by Rajarshi Guha

December 2nd, 2010 at 1:03 am

Pig and Cheminformatics

without comments

Pig is a platform for analyzing large datasets. At its core is a high level language (called Pig Latin), that is focused on specifying a series of data transformations. Scripts written in Pig Latin are executed by the Pig infrastructure either in local or map/reduce modes (the latter making use of Hadoop).

Previously I had investigated Hadoop for running cheminformatics tasks such as SMARTS matching and pharmacophore searching. While the implementation of such code is pretty straightforward, it’s still pretty heavyweight compared to say, performing SMARTS matching in a database via SQL. On the other hand, being able to perform these tasks in Pig Latin, lets us write much simpler code that can be integrated with other non-cheminformatics code in a flexible manner. An example of Pig Latin script that we might want to execute is:

1
2
3
A = load 'medium.smi' as (smiles:chararray);
B = filter A by net.rguha.dc.pig.SMATCH(smiles, 'NC(=O)C(=O)N');
store B into 'output.txt';

The script loads a file containing SMILES strings and then filters entries that match the specified SMARTS pattern and writes out the matching SMILES to an output file. Clearly, very similar to SQL. However, the above won’t work on a default Pig installation since SMATCH is not a builtin function. Instead we need to look at  a user defined function (UDF).

UDF’s are implemented in Java and can be classified into one of three types: eval, aggregate or filter functions. For this example I’ll consider a filter function that takes two strings representing a SMILES string and a SMARTS string and returns true if the SMILES contains the specified pattern.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SMATCH extends FilterFunc {
    static SMARTSQueryTool sqt;static {
        try {
            sqt = new SMARTSQueryTool("C");
        } catch (CDKException e) {
            System.out.println(e);
        }
    }
    static SmilesParser sp = new SmilesParser(DefaultChemObjectBuilder.getInstance());

    public Boolean exec(Tuple tuple) throws IOException {
        if (tuple == null || tuple.size() < 2) return false;
        String target = (String) tuple.get(0);
        String query = (String) tuple.get(1);
        try {
            sqt.setSmarts(query);
            IAtomContainer mol = sp.parseSmiles(target);
            return sqt.matches(mol);
        } catch (CDKException e) {
            throw WrappedIOException.wrap("Error in SMARTS pattern or SMILES string "+query, e);
        }
    }
}

A UDF for filtering must implement the FilterFunc interface which specifies a single method, exec. Within this method, we check whether we have the requisite number of input arguments and if so, simply return the value of the SMARTS match. For more details on filter functions see the UDF manual.

One of the key features of the code is the static initialization of the SMILES parser and SMARTS matcher. I’m not entirely sure how many times the UDF is instantiated during a query (once for each “row”? Once for the entire query?) – but if it’s more than once, we don’t want to instantiate the parser and matcher in the exec function. Note that since Hadoop is not a multithreaded model, we don’t need to worry about the lack of thread safety in the CDK.

Compiling the above class and packaging it into a jar file, allows us to run the above Pig Latin script (you’ll have to register the jar file at the beginning by writing register /path/to/myudf.jar) from the command line:

1
2
3
4
5
6
7
$ pig-0.4.0/bin/pig -x local match.pig # runs in local mode
$ 2010-09-09 20:37:00,107 [main] INFO  org.apache.pig.Main - Logging error messages to: /Users/rguha/src/java/hadoop-0.18.3/pig_1284079020107.log
$ 2010-09-09 20:37:39,278 [main] INFO  org.apache.pig.backend.local.executionengine.LocalPigLauncher - Successfully stored result in: "file:/Users/rguha/src/java/hadoop-0.18.3/output.txt"
$ 2010-09-09 20:37:39,278 [main] INFO  org.apache.pig.backend.local.executionengine.LocalPigLauncher - Records written : 9
$ 2010-09-09 20:37:39,278 [main] INFO  org.apache.pig.backend.local.executionengine.LocalPigLauncher - Bytes written : 0
$ 2010-09-09 20:37:39,278 [main] INFO  org.apache.pig.backend.local.executionengine.LocalPigLauncher - 100% complete!
$ 2010-09-09 20:37:39,278 [main] INFO  org.apache.pig.backend.local.executionengine.LocalPigLauncher - Success!!

As with my previous Hadoop code, the above UDF can be deployed anywhere that Hadoop and HDFS is installed – such as Amazon. The code in this post (and for other Pig UDFs) is available from my repository (in the v18 branch) and is based on Pig 0.4.0 – which is pretty old, but is required to work with Hadoop 0.18.3.

Written by Rajarshi Guha

September 10th, 2010 at 2:21 am

Hadoop and Atom Counting

with 6 comments

Over the past few months I’ve been hacking together scripts to distribute data parallel jobs. However, it’s always nice when somebody else has done the work. In this case, Hadoop is an implementation of the map/reduce framework from Google. As Yahoo and others have shown, it’s an extremely scalable framework, and when coupled with Amazons EC2, it’s an extremely powerful system, for processing large datasets.

I’ve been hearing a lot about Hadoop from my brother who is working on linking R with Hadoop and I thought that this would be a good time to try it out for myself. So the first task was to convert the canonical word counting example to something closer to my interest – counting occurrence of elements in a collection of SMILES. This is a relatively easy example, since SMILES files are line oriented, so it’s simply a matter of reworking the WordCount example that comes with the Hadoop distribution.

For now, I run Hadoop 0.20.0 on my  Macbook Pro following these instructions on setting up a single node Hadoop system. I also put the bin/ directory of the Hadoop distribution in my PATH. The code employs the CDK to parse a SMILES string and identify each element.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package net.rguha.dc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.openscience.cdk.DefaultChemObjectBuilder;
import org.openscience.cdk.exception.InvalidSmilesException;
import org.openscience.cdk.interfaces.IAtom;
import org.openscience.cdk.interfaces.IAtomContainer;
import org.openscience.cdk.smiles.SmilesParser;

import java.io.IOException;

public class HeavyAtomCount {
    static SmilesParser sp = new SmilesParser(DefaultChemObjectBuilder.getInstance());

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            try {
                IAtomContainer molecule = sp.parseSmiles(value.toString());
                for (IAtom atom : molecule.atoms()) {
                    word.set(atom.getSymbol());
                    context.write(word, one);
                }
            } catch (InvalidSmilesException e) {
                // do nothing for now
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(HeavyAtomCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

This is compiled in the usual manner and converted to a jar file. For some reason, Hadoop wouldn’t run the class unless the CDK classes were also included in the jar file (i.e., the -libjars argument didn’t seem to let me specify the CDK libraries separately from my code). So the end result was to include the whole CDK in my Hadoop program jar.

OK, the next thing was to create an input file. I extracted 10,000 SMILES from PubChem and copied them into my local HDFS by

1
hadoop dfs -copyFromLocal ~/Downloads/pubchem.smi input.smi

Then running my program is simply

1
hadoop jar rghadoop.jar input.smi output.smi

There’s quite a bit of output, though the interesting portion is

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
09/05/04 14:45:58 INFO mapred.JobClient: Counters: 17
09/05/04 14:45:58 INFO mapred.JobClient:   Job Counters
09/05/04 14:45:58 INFO mapred.JobClient:     Launched reduce tasks=1
09/05/04 14:45:58 INFO mapred.JobClient:     Launched map tasks=1
09/05/04 14:45:58 INFO mapred.JobClient:     Data-local map tasks=1
09/05/04 14:45:58 INFO mapred.JobClient:   FileSystemCounters
09/05/04 14:45:58 INFO mapred.JobClient:     FILE_BYTES_READ=533
09/05/04 14:45:58 INFO mapred.JobClient:     HDFS_BYTES_READ=482408
09/05/04 14:45:58 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1098
09/05/04 14:45:58 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=336
09/05/04 14:45:58 INFO mapred.JobClient:   Map-Reduce Framework
09/05/04 14:45:58 INFO mapred.JobClient:     Reduce input groups=0
09/05/04 14:45:58 INFO mapred.JobClient:     Combine output records=60
09/05/04 14:45:58 INFO mapred.JobClient:     Map input records=10000
09/05/04 14:45:58 INFO mapred.JobClient:     Reduce shuffle bytes=0
09/05/04 14:45:58 INFO mapred.JobClient:     Reduce output records=0
09/05/04 14:45:58 INFO mapred.JobClient:     Spilled Records=120
09/05/04 14:45:58 INFO mapred.JobClient:     Map output bytes=1469996
09/05/04 14:45:58 INFO mapred.JobClient:     Combine input records=244383
09/05/04 14:45:58 INFO mapred.JobClient:     Map output records=244383
09/05/04 14:45:58 INFO mapred.JobClient:     Reduce input records=60

So we’ve procesed 10,000 records which is good. To see what was generated, we do

1
hadoop dfs -cat output.smi/part-r-00000

and we get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Ag    13
Al    11
Ar    1
As    6
Au    4
B    49
Ba    7
Bi    1
Br    463
C    181452
Ca    5
Cd    2
Cl    2427
....

Thus across the entire 10,000 molecules, there were 13 occurrences of Ag, 1881,452 occurrences of carbons and so on.

Something useful?

OK, so this is a rather trivial example. But it was quite simple to create and more importantly, I should be able to take this jar file and run it on a proper multi-node Hadoop cluster and work with the entire PubChem collection.

A more realistic use case is to do SMARTS searching. In this case, the mapper would simply emit the molecule title along with an indication of whether it matched the supplied pattern (say 1 for a match, 0 otherwise) and the reducer would simply collect the key/value pairs for which the value was 1. Since one could do this with SMILES input, this is quite simple.

A slightly non-trivial task is to apply this framework to SD files. My motivation is that I’d like to run pharmacophore searches across large collections, without having to split up large SD files by hand. Hadoop is an excellent framework for this. The problem is that most Hadoop examples work with line-oriented data files. SD files are composed of multi-line records and so this type of input requires some extra work, which I’ll describe in my next post.

Debugging note

When debugging it’s useful to edit the Hadoop config file to have

1
2
3
4
5
<property>
  <name>mapred.job.tracker</name>
  <value>local</value>
  <description>foo</description>
</property>

so that it runs as a single process.

Written by Rajarshi Guha

May 4th, 2009 at 7:24 pm

Easy Parallelization With Clojure

with 6 comments

As I noted in my previous post, one of the nice features of Clojure is its support for concurrent programming. Now, it provides some fancy features that allow one to write complex parallel programs. I’m certainly no expert on that topic. However, one thing that I do everyday is perform operations on elements of a list. Traditionally, this is a serial operation. But what’d be nice is to have my compiler (or environment) perform this operation in parallel over the elements of the list. Clojure provides a very simple way to do this – pmap.

The map form simply applies a function to the elements of a list (or corresponding elements of multiple lists) in order, returning a list. By prepending the “p”, this is done in parallel making use of as many cores as are present on your system (but see below). Given the ease of this operation, lets see what we can do with pmap and the CDK.

Based on the previous post, lets calculate fingerprints in a serial fashion followed by the parallel version and see how long it takes. For completeness, I’ll repeat the code from the previous post. First import our packages and set up some basic objects and functions.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
(import '(org.openscience.cdk.smiles SmilesParser))
(import '(org.openscience.cdk DefaultChemObjectBuilder))
(import '(org.openscience.cdk.fingerprint MACCSFingerprinter))
(import '(org.openscience.cdk.fingerprint Fingerprinter))
(import '(org.openscience.cdk.fingerprint ExtendedFingerprinter))
(import '(org.openscience.cdk.smiles.smarts SMARTSQueryTool))
;; so we can read lines from a file
(use 'clojure.contrib.duck-streams)

(def sp (new SmilesParser (. DefaultChemObjectBuilder (getInstance))))
(def fprinter (new ExtendedFingerprinter))

(defn getmol [smiles] (. sp (parseSmiles smiles)))
(defn getfp [mol] (. fprinter (getFingerprint mol)))

Next, we load the 4,688 molecules from the data file I described previously. In contrast to before, this code is slightly shorter (thanks to Nik) but also uses the doall form. This forces evaluation of the list, so that the molecules are all loaded into memory. In the timing code we again use it, since if we don’t, we just get the time for the list creation step (which is “instantaneous” due to lazy evaluation), rather than the actual list evaluation.

1
2
(def mols (doall (map #(getmol (. % trim))
              (read-lines "junk.smi"))))

Now, we can evaluate the fingerprints and time the operation. Initially I performed these calculations on my Macbook Pro with 2GB RAM and a dual core CPU.

1
(time (def fpserial (doall (map getfp mols))))

This run took 38.8 s (averaged over three runs). Next we consider the parallel version

1
(time (def fpparallel (doall (pmap getfp mols))))

This version has an average run time of 23.4 s – a 1.6x speedup. Now, it’s not exactly a two-fold speedup. Part of the reason is that there is some overhead for the threads. Also, even in the serial version, the garbage collector takes up some of the second core and in the parallel version, this will contend with the actual calculation.

Just to be sure that the calculation works OK, lets compare (via BitSet.equals) the fingerprints obtained using the two versions. We expect the result of the code below to be 0

1
2
3
4
5
(count (filter #(if (not %) %)
           (map
        (fn [x,y]
            (. x (equals y)))
        fpserial fpparallel)))

and that’s exactly what we get.

What about using more cores? I have access to some dual-CPU machines with 8GB of RAM, each CPU having four cores. Repeating the above calculations, the serial version takes 28.1 s and the parallel version takes 8.6 s, a 3.2x speedup. One thing I noted was that this really only uses the cores on one CPU, rather than all eight cores.

One thing that will require more investigation is to what extent we can make use of the CDK in parallel environments, since the library was not designed with thread-safety in mind. For example, parsing the SMILES strings using pmap (after reading in all the lines from the file) gives me an ExecutionException error.

In any case, it’s very cool that I can use multiple cores just converting map to pmap.

Written by Rajarshi Guha

March 6th, 2009 at 5:15 pm

Posted in cheminformatics,software

Tagged with , , ,

Multi-threaded Database Access with Python

with 8 comments

Pub3D contains about 17.3 million 3D structures for PubChem compounds, stored in a Postgres database. One of the things we wanted to do was 3D similarity searching and to achieve that we’ve been employing the Ballester and Graham-Richards method. In this post I’m going to talk about performance – how we went from a single monolithic database with long query times, to multiple databases and significantly faster  multi-threaded queries.

Read the rest of this entry »

Written by Rajarshi Guha

November 14th, 2008 at 4:46 pm