So much to do, so little time

Trying to squeeze sense out of chemical data

Archive for the ‘distributed computing’ tag

Hadoop and SD Files

with 9 comments

In my previous post I had described my initial attempts at working with Hadoop, an implementation of the map/reduce framework. Most Hadoop examples are based on line oriented input files. In the cheminformatics domain, SMILES files are line oriented and so applying Hadoop to a variety of tasks that work with SMILES input is easy. However, a number of problems require 3D coordinates or meta data and SMILES cannot support this. Instead, we consider SD files, where each molecule is a multi-line record. So to make use of Hadoop, we’re going to need to support multi-line records.

Handling multi-line records

The basic idea is to create a class that will accumulate text from an SD file until it reaches the $$$$ marker, indicating the end of a molecule. The resultant text is then sent to the mapper as a string. The mapper can then use the CDK to parse the string representation of the SD record into an IAtomContainer object and then carry on from there.

So how do we generate multi-line records for the mapper? First, we extend TextInputFormat so that our class returns an appropriate RecordReader.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package net.rguha.dc.io;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class SDFInputFormat extends TextInputFormat {

    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
        return new SDFRecordReader();
    }
}

The SDFRecordReader class is where all the work is done. We start of by setting some variables

1
2
3
4
5
6
7
8
9
10
11
public class SDFRecordReader extends RecordReader<LongWritable, Text> {
    private long end;
    private boolean stillInChunk = true;

    private LongWritable key = new LongWritable();
    private Text value = new Text();

    private FSDataInputStream fsin;
    private DataOutputBuffer buffer = new DataOutputBuffer();

    private byte[] endTag = "$$$$\n".getBytes();

The main method in this class is nextKeyValue(). The code in this class simply reads bytes from the input until it hits the end molecule marker ($$$$) and then sets the value to the resultant string and the key to the position in the file. At this point, it doesn’t really matter what we use for the key, since the mapper will usually work with a molecule identifier and some calculated property. As a result, the reducer will likely not get to the see the key generated in this method.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
     public boolean nextKeyValue() throws IOException {
        if (!stillInChunk) return false;

        boolean status = readUntilMatch(endTag, true);
       
        value = new Text();
        value.set(buffer.getData(), 0, buffer.getLength());
        key = new LongWritable(fsin.getPos());
        buffer.reset();

        // status is true as long as we're still within the
        // chunk we got (i.e., fsin.getPos() < end). If we've
        // read beyond the chunk it will be false
        if (!status) {
            stillInChunk = false;
        }

        return true;
    }

The remaining methods are pretty self-explanatory and I’ve included the entire class below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package net.rguha.dc.io;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;

public class SDFRecordReader extends RecordReader<LongWritable, Text> {
    private long end;
    private boolean stillInChunk = true;

    private LongWritable key = new LongWritable();
    private Text value = new Text();

    private FSDataInputStream fsin;
    private DataOutputBuffer buffer = new DataOutputBuffer();

    private byte[] endTag = "$$$$\n".getBytes();

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        FileSplit split = (FileSplit) inputSplit;
        Configuration conf = taskAttemptContext.getConfiguration();
        Path path = split.getPath();
        FileSystem fs = path.getFileSystem(conf);

        fsin = fs.open(path);
        long start = split.getStart();
        end = split.getStart() + split.getLength();
        fsin.seek(start);

        if (start != 0) {
            readUntilMatch(endTag, false);
        }
    }

    public boolean nextKeyValue() throws IOException {
        if (!stillInChunk) return false;

        boolean status = readUntilMatch(endTag, true);
       
        value = new Text();
        value.set(buffer.getData(), 0, buffer.getLength());
        key = new LongWritable(fsin.getPos());
        buffer.reset();

        if (!status) {
            stillInChunk = false;
        }

        return true;
    }

    public LongWritable getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    public void close() throws IOException {
        fsin.close();
    }

    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
        int i = 0;
        while (true) {
            int b = fsin.read();
            if (b == -1) return false;
            if (withinBlock) buffer.write(b);
            if (b == match[i]) {
                i++;
                if (i >= match.length) {
                    return fsin.getPos() < end;
                }
            } else i = 0;
        }
    }

}

Using multi-line records

Now that we have classes to handle multi-line records, using them is pretty easy. For example, lets rework the atom counting example, to work with SD file input. In this case, the value argument of the map() method will be a String containing the SD record. We simply parse this using the MDLV2000Reader and then proceed as before.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text haCount = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            try {
                StringReader sreader = new StringReader(value.toString());
                MDLV2000Reader reader = new MDLV2000Reader(sreader);
                ChemFile chemFile = (ChemFile)reader.read((ChemObject)new ChemFile());
                List<IAtomContainer> containersList = ChemFileManipulator.getAllAtomContainers(chemFile);
                IAtomContainer molecule = containersList.get(0);
                for (IAtom atom : molecule.atoms()) {
                    haCount.set(atom.getSymbol());
                    context.write(haCount, one);
                }
            } catch (CDKException e) {
                e.printStackTrace();
            }
        }
    }

Before running it, we need to tell Hadoop to use our SDFInputFormat class and this is done in the main program driver by

1
2
3
4
Configuration conf = new Configuration()
Job job = new Job(conf, "haCount count");
job.setInputFormatClass(SDFInputFormat.class);
...

After regenerating the jar file we’re ready to run. Our SD file contains 100 molecules taken from Pub3D and we copy it into our local HDFS

1
hadoop dfs -copyFromLocal ~/Downloads/small.sdf input.sdf

and then run our program

1
hadoop jar rghadoop.jar input.sdf output.sdf

Looking at the output,

1
2
3
4
5
6
7
8
9
10
$ hadoop dfs -cat output.sdf/part-r-00000
Br  40
C   2683
Cl  23
F   15
H   2379
N   300
O   370
P   1
S   82

we get a count of the elements across the 100 molecules.

With the SDFInputFormat and SDFRecordReader classes, we are now in a position to throw pretty much any cheminformatics task onto a Hadoop cluster. The next post will look at doing SMARTS based substructure searches using this framework. Future posts will consider the performance gains (or not) when using this approach.

Written by Rajarshi Guha

May 4th, 2009 at 8:32 pm

Hadoop and Atom Counting

with 6 comments

Over the past few months I’ve been hacking together scripts to distribute data parallel jobs. However, it’s always nice when somebody else has done the work. In this case, Hadoop is an implementation of the map/reduce framework from Google. As Yahoo and others have shown, it’s an extremely scalable framework, and when coupled with Amazons EC2, it’s an extremely powerful system, for processing large datasets.

I’ve been hearing a lot about Hadoop from my brother who is working on linking R with Hadoop and I thought that this would be a good time to try it out for myself. So the first task was to convert the canonical word counting example to something closer to my interest – counting occurrence of elements in a collection of SMILES. This is a relatively easy example, since SMILES files are line oriented, so it’s simply a matter of reworking the WordCount example that comes with the Hadoop distribution.

For now, I run Hadoop 0.20.0 on my  Macbook Pro following these instructions on setting up a single node Hadoop system. I also put the bin/ directory of the Hadoop distribution in my PATH. The code employs the CDK to parse a SMILES string and identify each element.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package net.rguha.dc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.openscience.cdk.DefaultChemObjectBuilder;
import org.openscience.cdk.exception.InvalidSmilesException;
import org.openscience.cdk.interfaces.IAtom;
import org.openscience.cdk.interfaces.IAtomContainer;
import org.openscience.cdk.smiles.SmilesParser;

import java.io.IOException;

public class HeavyAtomCount {
    static SmilesParser sp = new SmilesParser(DefaultChemObjectBuilder.getInstance());

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            try {
                IAtomContainer molecule = sp.parseSmiles(value.toString());
                for (IAtom atom : molecule.atoms()) {
                    word.set(atom.getSymbol());
                    context.write(word, one);
                }
            } catch (InvalidSmilesException e) {
                // do nothing for now
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(HeavyAtomCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

This is compiled in the usual manner and converted to a jar file. For some reason, Hadoop wouldn’t run the class unless the CDK classes were also included in the jar file (i.e., the -libjars argument didn’t seem to let me specify the CDK libraries separately from my code). So the end result was to include the whole CDK in my Hadoop program jar.

OK, the next thing was to create an input file. I extracted 10,000 SMILES from PubChem and copied them into my local HDFS by

1
hadoop dfs -copyFromLocal ~/Downloads/pubchem.smi input.smi

Then running my program is simply

1
hadoop jar rghadoop.jar input.smi output.smi

There’s quite a bit of output, though the interesting portion is

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
09/05/04 14:45:58 INFO mapred.JobClient: Counters: 17
09/05/04 14:45:58 INFO mapred.JobClient:   Job Counters
09/05/04 14:45:58 INFO mapred.JobClient:     Launched reduce tasks=1
09/05/04 14:45:58 INFO mapred.JobClient:     Launched map tasks=1
09/05/04 14:45:58 INFO mapred.JobClient:     Data-local map tasks=1
09/05/04 14:45:58 INFO mapred.JobClient:   FileSystemCounters
09/05/04 14:45:58 INFO mapred.JobClient:     FILE_BYTES_READ=533
09/05/04 14:45:58 INFO mapred.JobClient:     HDFS_BYTES_READ=482408
09/05/04 14:45:58 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1098
09/05/04 14:45:58 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=336
09/05/04 14:45:58 INFO mapred.JobClient:   Map-Reduce Framework
09/05/04 14:45:58 INFO mapred.JobClient:     Reduce input groups=0
09/05/04 14:45:58 INFO mapred.JobClient:     Combine output records=60
09/05/04 14:45:58 INFO mapred.JobClient:     Map input records=10000
09/05/04 14:45:58 INFO mapred.JobClient:     Reduce shuffle bytes=0
09/05/04 14:45:58 INFO mapred.JobClient:     Reduce output records=0
09/05/04 14:45:58 INFO mapred.JobClient:     Spilled Records=120
09/05/04 14:45:58 INFO mapred.JobClient:     Map output bytes=1469996
09/05/04 14:45:58 INFO mapred.JobClient:     Combine input records=244383
09/05/04 14:45:58 INFO mapred.JobClient:     Map output records=244383
09/05/04 14:45:58 INFO mapred.JobClient:     Reduce input records=60

So we’ve procesed 10,000 records which is good. To see what was generated, we do

1
hadoop dfs -cat output.smi/part-r-00000

and we get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Ag    13
Al    11
Ar    1
As    6
Au    4
B    49
Ba    7
Bi    1
Br    463
C    181452
Ca    5
Cd    2
Cl    2427
....

Thus across the entire 10,000 molecules, there were 13 occurrences of Ag, 1881,452 occurrences of carbons and so on.

Something useful?

OK, so this is a rather trivial example. But it was quite simple to create and more importantly, I should be able to take this jar file and run it on a proper multi-node Hadoop cluster and work with the entire PubChem collection.

A more realistic use case is to do SMARTS searching. In this case, the mapper would simply emit the molecule title along with an indication of whether it matched the supplied pattern (say 1 for a match, 0 otherwise) and the reducer would simply collect the key/value pairs for which the value was 1. Since one could do this with SMILES input, this is quite simple.

A slightly non-trivial task is to apply this framework to SD files. My motivation is that I’d like to run pharmacophore searches across large collections, without having to split up large SD files by hand. Hadoop is an excellent framework for this. The problem is that most Hadoop examples work with line-oriented data files. SD files are composed of multi-line records and so this type of input requires some extra work, which I’ll describe in my next post.

Debugging note

When debugging it’s useful to edit the Hadoop config file to have

1
2
3
4
5
<property>
  <name>mapred.job.tracker</name>
  <value>local</value>
  <description>foo</description>
</property>

so that it runs as a single process.

Written by Rajarshi Guha

May 4th, 2009 at 7:24 pm

Chemistry, Clouds, Collaboration (Part 1)

with 4 comments

There’s been an interesting discussion sparked by Deepaks post, asking why there is a much smaller showing of chemists and chemistry applications in the cloud compared to other life science areas. This post led to a FriendFeed thread that raised a number of issues.

At a high level one can easily point out factors such as licensing costs for the tools to do chemistry in the cloud, lack of standards in data sets and formats and so on. As Joerg pointed out in the FF thread, IP issues and security are major factors. Even though I’m not a cloud expert, I have read and heard of various cases where financial companies are using clouds. Whether their applications involves sensitive data I don’t know, but it seems that this is one area that is addressable (if not already addressed). As a side note, I was interested in seeing that Lilly seems to be making a move towards an Amazon based cloud infrastructure.

But when I read Deepaks post, the question that occurred to me was: what is the compelling chemistry application that would really make use of the cloud?

While things like molecular dynamics are not going to run too well on a cloud set up, problems that are data parallel can make excellent use of such a set up. Given that, some immediate applications include docking, virtual screening and so on. There have been a number of papers talking about the use of Grids for docking, so one could easily consider docking in the cloud. Virtual screening (using docking, machine learning etc) would be another application.

But the problem I see facing these efforts is that they tend to be project specific. In contrast doing something like BLAST in the cloud is more standardized – you send in a sequence and compare it to the usual standard databases of sequences. On the other hand, each docking project is different, in terms of receptor (though there’s less variation) and ligand libraries. So on the chemistry side, the input is much larger and more variable.

Similarity searching is another example – one usually searches against a public database or a corporate collection. If these are not in the cloud, making use of the cloud is not very practical. Furthermore, how many different collections should be stored and accessed in the cloud?

Following on from this, one could ask, are chemistry datasets really that large? I’d say, no. But I qualify this statement by noting that many projects are quite specific – a single receptor of interest and some focused library. Even if that library is 2 or 3  million compounds, it’s still not very large. For example, while working on the Ugi project with Jean-Claude Bradley I had to dock 500,000 compounds. It took a few days to set up the conformers and then 1.5 days to do the docking, on 8 machines. With the conformers in hand, we can rapidly redock against other targets. But 8 machines is really small. Would I want to do this in the cloud? Sure, if it was set up for me. But I’d still have to transfer 80GB of data (though Amazon has this now). So the data is not big enough that I can’t handle it.

So this leads to the question: what is big enough to make use of the cloud?

What about really large structure databases? Say PubChem and ChemSpider? While Amazon has made progress in this direction by hosting PubChem, chemistry still faces the problem that PubChem is not the whole chemical universe. There will invariably be portions of chemical space that are not represented in a database. On the other hand a community oriented database like ChemSpider could take on this role – it already contains PubChem, so one could consider groups putting in their collections of interest (yes, IP is an issue but I can be hopeful!) and expanding the coverage of chemical space.

So to summarize, why isn’t there more chemistry in the cloud? Some possibilities include

  • Chemistry projects tend to be specific, in the sense that there aren’t a whole lot of “standard” collections
  • Large structure databases are not in the cloud and if they are, still do not cover the whole of chemical space
  • Many chemistry problems are not large in terms of data size, compared to other life science applications
  • Cheminformatics is a much smaller community than bioinformatics, though is applies mainly to non-corporate settings (where the reverse is likely true)

Though I haven’t explicitly talked about the tools – that certainly plays a factor. While there are a number of Open Source solutions to various cheminformatics problems, many people use commercial tools and will want to use them in the cloud. So one factor that will need to be addressed is the vendors coming on board and supporting cloud style setups.

Written by Rajarshi Guha

February 22nd, 2009 at 5:00 pm