Hadoop and Atom Counting

Over the past few months I’ve been hacking together scripts to distribute data parallel jobs. However, it’s always nice when somebody else has done the work. In this case, Hadoop is an implementation of the map/reduce framework from Google. As Yahoo and others have shown, it’s an extremely scalable framework, and when coupled with Amazons EC2, it’s an extremely powerful system, for processing large datasets.

I’ve been hearing a lot about Hadoop from my brother who is working on linking R with Hadoop and I thought that this would be a good time to try it out for myself. So the first task was to convert the canonical word counting example to something closer to my interest – counting occurrence of elements in a collection of SMILES. This is a relatively easy example, since SMILES files are line oriented, so it’s simply a matter of reworking the WordCount example that comes with the Hadoop distribution.

For now, I run Hadoop 0.20.0 on my  Macbook Pro following these instructions on setting up a single node Hadoop system. I also put the bin/ directory of the Hadoop distribution in my PATH. The code employs the CDK to parse a SMILES string and identify each element.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package net.rguha.dc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.openscience.cdk.DefaultChemObjectBuilder;
import org.openscience.cdk.exception.InvalidSmilesException;
import org.openscience.cdk.interfaces.IAtom;
import org.openscience.cdk.interfaces.IAtomContainer;
import org.openscience.cdk.smiles.SmilesParser;

import java.io.IOException;

public class HeavyAtomCount {
    static SmilesParser sp = new SmilesParser(DefaultChemObjectBuilder.getInstance());

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            try {
                IAtomContainer molecule = sp.parseSmiles(value.toString());
                for (IAtom atom : molecule.atoms()) {
                    word.set(atom.getSymbol());
                    context.write(word, one);
                }
            } catch (InvalidSmilesException e) {
                // do nothing for now
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(HeavyAtomCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

This is compiled in the usual manner and converted to a jar file. For some reason, Hadoop wouldn’t run the class unless the CDK classes were also included in the jar file (i.e., the -libjars argument didn’t seem to let me specify the CDK libraries separately from my code). So the end result was to include the whole CDK in my Hadoop program jar.

OK, the next thing was to create an input file. I extracted 10,000 SMILES from PubChem and copied them into my local HDFS by

1
hadoop dfs -copyFromLocal ~/Downloads/pubchem.smi input.smi

Then running my program is simply

1
hadoop jar rghadoop.jar input.smi output.smi

There’s quite a bit of output, though the interesting portion is

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
09/05/04 14:45:58 INFO mapred.JobClient: Counters: 17
09/05/04 14:45:58 INFO mapred.JobClient:   Job Counters
09/05/04 14:45:58 INFO mapred.JobClient:     Launched reduce tasks=1
09/05/04 14:45:58 INFO mapred.JobClient:     Launched map tasks=1
09/05/04 14:45:58 INFO mapred.JobClient:     Data-local map tasks=1
09/05/04 14:45:58 INFO mapred.JobClient:   FileSystemCounters
09/05/04 14:45:58 INFO mapred.JobClient:     FILE_BYTES_READ=533
09/05/04 14:45:58 INFO mapred.JobClient:     HDFS_BYTES_READ=482408
09/05/04 14:45:58 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1098
09/05/04 14:45:58 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=336
09/05/04 14:45:58 INFO mapred.JobClient:   Map-Reduce Framework
09/05/04 14:45:58 INFO mapred.JobClient:     Reduce input groups=0
09/05/04 14:45:58 INFO mapred.JobClient:     Combine output records=60
09/05/04 14:45:58 INFO mapred.JobClient:     Map input records=10000
09/05/04 14:45:58 INFO mapred.JobClient:     Reduce shuffle bytes=0
09/05/04 14:45:58 INFO mapred.JobClient:     Reduce output records=0
09/05/04 14:45:58 INFO mapred.JobClient:     Spilled Records=120
09/05/04 14:45:58 INFO mapred.JobClient:     Map output bytes=1469996
09/05/04 14:45:58 INFO mapred.JobClient:     Combine input records=244383
09/05/04 14:45:58 INFO mapred.JobClient:     Map output records=244383
09/05/04 14:45:58 INFO mapred.JobClient:     Reduce input records=60

So we’ve procesed 10,000 records which is good. To see what was generated, we do

1
hadoop dfs -cat output.smi/part-r-00000

and we get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Ag    13
Al    11
Ar    1
As    6
Au    4
B    49
Ba    7
Bi    1
Br    463
C    181452
Ca    5
Cd    2
Cl    2427
....

Thus across the entire 10,000 molecules, there were 13 occurrences of Ag, 1881,452 occurrences of carbons and so on.

Something useful?

OK, so this is a rather trivial example. But it was quite simple to create and more importantly, I should be able to take this jar file and run it on a proper multi-node Hadoop cluster and work with the entire PubChem collection.

A more realistic use case is to do SMARTS searching. In this case, the mapper would simply emit the molecule title along with an indication of whether it matched the supplied pattern (say 1 for a match, 0 otherwise) and the reducer would simply collect the key/value pairs for which the value was 1. Since one could do this with SMILES input, this is quite simple.

A slightly non-trivial task is to apply this framework to SD files. My motivation is that I’d like to run pharmacophore searches across large collections, without having to split up large SD files by hand. Hadoop is an excellent framework for this. The problem is that most Hadoop examples work with line-oriented data files. SD files are composed of multi-line records and so this type of input requires some extra work, which I’ll describe in my next post.

Debugging note

When debugging it’s useful to edit the Hadoop config file to have

1
2
3
4
5
<property>
  <name>mapred.job.tracker</name>
  <value>local</value>
  <description>foo</description>
</property>

so that it runs as a single process.

6 thoughts on “Hadoop and Atom Counting

  1. […] my previous post I had described my initial attempts at working with Hadoop, an implementation of the […]

  2. […] last two posts have described recent attempts at working with Hadoop, a map/reduce framework. As I […]

  3. Rich Apodaca says:

    Rajarshi, very interesting stuff. I hadn’t seen Hadoop before your posts.

    I’m curious, what’s your sense of when Hadoop is not a good solution compared to, say an RDBMS or a Map Reduce document database like CouchDB?

  4. Rich, good question. However, not having used CouchDB (and also I just started with Hadoop last night), I probably can’t give a very good answer.

    My understanding is that Hadoop is a general purpose framework for map/reduce style tasks. In that sense, one could build something like CouchDB on top of the Hadoop framework.

    However a few points.

    First – from the point of view of computation, Hadoop seems to be preferable. In the sense, that one can bring computation to the data. If one were to store the data in an RDBMS, it would imply that you’d do arbitrary computation within the DB (which is unwieldy in many RDBMS’s). So if the scenario is such that you are performing abritrary computations across large datasets and want to process every object, Hadoop seems a good thing

    (One thing that I will be working on is to do pharmacophore searches with Hadoop. While one certainly could do this in an RDBMS, but I think it’d be a bit of a hack. In contrast, I can simple rework preexisting pharmacophore code into the Hadoop framework with minimal effort)

    Second – I think unless we’re talking about parallel RDBMS’s, then it doesn’t really make sense to compare. An interesting discussion of map/reduce versus parallel RDBMS is at http://tinyurl.com/cdy5rl. Of course if the application doesn’t require any significant relational algebra, then an RDBMS is definitely overkill.

    One of the big things versus RDBMS and plain old map/reduce might be the way indexing is done – in effect, RDBMS will pre-compute stuff. I don’t know whether CouchDB can do indexing on arbitrary fields (my understanding is that it basically acts like a big hash table?)

    Not sure whether I answered your question :)

  5. […] the last few posts I’ve described how I’ve gotten up to speed on developing Map/Reduce […]

  6. Deepak says:

    I’ve always wondered and meant to ask, but only just realized (how many times have I read this post?) that Saptarshi is your brother. Geeks run in the family eh :)

Leave a Reply to Cheminformatics with Hadoop and EC2 at So much to do, so little time Cancel reply

Your email address will not be published. Required fields are marked *