So much to do, so little time

Trying to squeeze sense out of chemical data

HTS and Message Queues

In my previous post I discussed how we’d like to automate some of our screens – starting from the primary screen, going through data processing and compound selection and completing the secondary (follow up) screen. A key feature of such a workflow is the asynchronous nature of the individual steps. Messaging and Message queues (MQ) provide an excellent approach to handling this type of problem.

Message queue systems

A number of such MQ systems are available such as ActiveMQ, RabbitMQ and so on. See here for a comparison of different MQ systems. Given that we already use Oracle for our backend databases, we use Oracle Advanced Queue (AQ). One advantage of this is that we can store the messages in the database, allowing us to keep a history of a screen as well as use SQL queries to retrieve messages if desired. Such storage can obviously slows things down, but our message throughput is low enough that it doesn’t matter for us.

In this post I’ll briefly describe how I set up a queue on the database side and show the code for a Java application to send a message to the queue and retrieve a message from the queue. The example will actually use the JMS API, which Oracle AQ implements. As a result, the code can trivially swap out AQ for any other JMS implementation.

Creating queues & tables

The first step is to create a queue table and some queues in the database. The PL/SQL to generate these is

 1234567891011121314 BEGIN DBMS_AQADM.create_queue_table( queue_table => 'test_qt', queue_payload_type => 'SYS.AQ$_JMS_MESSAGE'); DBMS_AQADM.create_queue( queue_table => 'test_qt', queue_name => 'input_q', retention_time => DBMS_AQADM.INFINITE); DBMS_AQADM.start_queue('input_q'); END; / quit So we’ve created a queue table called test_qt which will hold a queue called input_q. The plan is that we’ll have a process listening on this queue and processing each message as it comes and another process that will send a specified number of messages to the queue. The queue_payload_type argument to the create call, indicates that we can store any of the standard JMS message types (though we’ll be focusing on the text message type). We’ve also specified that for the input_q queue, messages will be retained in the database indefinitely. This is useful for debugging and auditing purposes. Message producers & consumers OK, with the queues set up, we can now write some Java code to send messages and receive them. In this example, the receiving code will actually run continuously, blocking until messages are received. This example extends TimerTask. The strategy is that when the listener receives a message, it will create a new instance of this task and schedule it immediately on a new thread. As a result the message processing logic is contained within the run method. At this stage, we only consider messages that are of type TextMessage. If that’s the case we simply extract the payload of the message and print it to STDOUT. You’ll note that we also create a unique listener ID and include that in the output. This is handy when we run multiple listeners and want to check that messages are being received by all of them.  123456789101112131415161718192021222324252627282930 public class QueueExample extends TimerTask { static final String URL = "jdbc:oracle:thin:USER/PASSWD@HOST:PORT:SID"; private Message mesg; /* Useful to differentiate between multiple instances of the listener */ private static final String listenerID = UUID.randomUUID().toString(); static final String schema = "wtc"; static final String qTable = "test_qt"; static final String qName = "input_q"; static QueueConnection con = null; static QueueSession sess = null; static javax.jms.Queue q = null; public QueueExample(Message m) { mesg = m; } public void run() { try { if (!(mesg instanceof TextMessage)) return; String payload = ((TextMessage) mesg).getText(); System.out.println(listenerID + ": Got msg: " + payload); } catch (JMSException e) { e.printStackTrace(); } } Before looking at sending and receiving messages we need to initialize the connection to the message queue  12345678910111213141516 private static void initializeQueue() throws JMSException { QueueConnectionFactory queue = AQjmsFactory.getQueueConnectionFactory(URL, new Properties()); QueueConnection con = (QueueConnection) queue.createConnection(); con.start(); sess = (QueueSession) con.createSession(false, Session.AUTO_ACKNOWLEDGE); AQQueueTable qtab = ((AQjmsSession) sess).getQueueTable(schema, qTable); try { q = ((AQjmsSession) sess).getQueue(schema, qName); } catch (Exception ex) { AQjmsDestinationProperty props = new AQjmsDestinationProperty(); q = ((AQjmsSession) sess).createQueue(qtab, qName, props); } } The next step is to listen for messages and dispatch them for processing. The method below initializes the queue if it isn’t already initialized. After creating a consumer object, we simply wait for messages to come in. The receive method is blocking, so the program will wait for the next message. Once a message is received it creates an instance of this class and schedules it – when the thread starts, the run method will execute to process the message.  12345678910111213 public static void listener() throws JMSException { if (q == null) initializeQueue(); System.out.println(listenerID + ": Listening on queue " + q.getQueueName() + "..."); MessageConsumer consumer = sess.createConsumer(q); // each time we get a message, start up the message handler in a new thread for (Message m; (m = consumer.receive()) != null;) { new Timer().schedule(new QueueExample(m), 0); } sess.close(); con.close(); } The final component is to send messages. For this simple example, it’s primarily boiler plate code. In this case, we specify how many messages to send. The DeliveryMode.PERSISTENT indicates that the messages will be stored (in this case in the DB) until a consumer has received it. Note that after receipt by a consumer the message may or may not be stored in the database. See here for more details. In the code below, we can set a variety of properties on the message. For example, we’ve set an “application id” (the JMSXAppID property) and a correlation id. Right now, we ignore this, but it can be used to link messages or even link a message to an external resource (though that could also be done via the payload itself). Another useful property that could be set is the message type via setJMSType. Using this one can assign a MIME type to a message allowing the message processing code to conditionally handle the message based on the type. For more details on the various properties that can be set see Message documentation.  1234567891011121314151617 public static void sender(int n) throws JMSException { if (q == null) initializeQueue(); MessageProducer producer = sess.createProducer(q); producer.setDeliveryMode(DeliveryMode.PERSISTENT); Message msg; for (int i = 0; i < n; i++) { msg = sess.createTextMessage(); msg.setStringProperty("JMSXAppID", "QueueExample"); msg.setJMSCorrelationID(UUID.randomUUID().toString()); ((TextMessage) msg).setText("This is message number " + i); producer.send(msg); } producer.close(); sess.close(); } Running The complete source code can be found here. To compile it you’ll need an OJDBC jar file as well as the following jar files (that come with the Oracle installation) •$ORACLE_HOME/rdbms/jlib/aqapi.jar
• $ORACLE_HOME/rdbms/jlib/jmscommon.jar •$ORACLE_HOME/jlib/jndi.jar
• $ORACLE_HOME/jlib/jta.jar •$ORACLE_HOME/rdbms/jlib/xdb.jar
• $ORACLE_HOME/lib/xmlparserv2.jar Once the code has been compiled to a jar file, we first start the listener:  12 guhar$ java -jar dist/qex.jar listen 8b9fc2a2-533c-4426-a368-3e6ddfb41587: Listening on queue input_q...

In another terminal we send some messages

 1 guhar\$ java -jar dist/qex.jar send 5

Switching to the previous terminal we should see something like

 12345 8b9fc2a2-533c-4426-a368-3e6ddfb41587: Got msg: This is message number 0 8b9fc2a2-533c-4426-a368-3e6ddfb41587: Got msg: This is message number 1 8b9fc2a2-533c-4426-a368-3e6ddfb41587: Got msg: This is message number 2 8b9fc2a2-533c-4426-a368-3e6ddfb41587: Got msg: This is message number 3 8b9fc2a2-533c-4426-a368-3e6ddfb41587: Got msg: This is message number 4

The fun starts when we instantiate multiple listeners (possible on different machines). It’s simple enough to execute the first invocation above multiple times and watch the output as we send more messages. If you send 10 messages, you should see that some are handled by one listener and the remainder by another one and so on. if the actual message processing is compute intensive, this allows you to easily distribute such loads easily.

Next steps

The code discussed here is a minimalistic example of sending and receiving messages from a queue. In the next post, I’ll discuss how we can represent messages in the database using a custom message type (defined in terms of an Oracle ADT) and send and receive such messages using Java. Such custom message types allow the Java code to remain object oriented, with the AQ libraries handling serialization and deserialization of the messages between our code and the queue.

One of the downsides that I see with Oracle AQ is that the only clients supported are PL/SQL, C and Java. While AQ implements the JMS API, it employs its own wire protocol. The lack of support for  AMQP means that a lot of client libraries in other languages cannot be used to send or retrieve messages from AQ. If anybody knows of Python packages that work with Oracle AQ I’d love to hear about them. (Looks like stomppy might support AQ?)

Written by Rajarshi Guha

July 11th, 2010 at 9:00 pm

Posted in software

Tagged with , , , , ,

Automating the Screening Pipeline

with one comment

A key feature of high throughput screening (HTS) efforts is automation. The NCGC is no stranger to automation, with two Kalypsys robots and a variety of automated components such as liquid handlers and so on. But while the screen itself is automated, the transitions between subsequent steps are not. Thus, after a screen is complete, I will be notified that the data is located in some directory. I’ll then load up the data, process it and end up with a set of compounds for followup. I’d then send the list of compounds to be plated which would then be screened in a follow up assay.

In a number of situations, this approach is unavoidable as the data processing stage requires human intervention (plate corrections, switching controls, etc.). But in some situations, we can automate the whole process – primary screen, automated analysis & compound selection and secondary screen. Given that most screens at NCGC are dose response screens, we can refine an automated pipeline by processing individual plate series (i.e. a collection of plates representing a titration series) rather than waiting for all the plates to be completed.  Another important point to note is that the different steps being considered here take different times. Thus screening a plate series might take 15 minutes, processing the resultant data and making selections would take 3 minutes and performing the secondary screen might take 10 minutes. Clearly the three steps have to proceed in the given order – but we don’t necessarily want to wait for each preceding step to be complete. In other words, we need the steps to proceed asynchronously, yet maintain temporal ordering.

One approach to automating such a process is the use of a message queue (MQ). The fundamental idea behind a MQ is that one creates a queue on some machine and then starts one or more processes (likely on some other machines) to send messages to the queue. These messages can then be retrieved by one or more listener processes. MQ systems provide a number of useful features beyond the core functionality of storing and distributing messages – these include message persistence, security policy, routing, batching and so on.

In our case, when a plate series is screened, the robot sends a message to the queue. Some process will be listening to the queue and when it sees a message, pulls it of the queue and processes the data from the screen for that plate series. Once processing is complete, the process sends another message to the queue (or another queue) from which yet another process (this one running on another robot) can pull it off and start the secondary screen on the selected compounds. Thus, as soon as a plate series is finished in the primary screen, we can start the processing and follow up, while the next plate series gets started. A message queue approach is also useful since messages can remain on the queue until the appropriate listener pulls them of for processing. A good queue system will ensure that such messages are delivered reliably and don’t get lost.

The diagram below highlights this approach. The solid lines represent the traditional workflow. Given that we’d manually process the screening data, we’d wait till all plate series are run. The dashed lines represent a message based workflow, in which we can process each plate series independently.

In the next few posts I’ll describe such a message queue based workflow that I’ve been working on these past few days. Currently it’s specific to a screen that we’re going to be running. The infrastructure is written in Java and makes use of Oracle Advanced Queue (AQ) to provide message queues and the facilities for receiving and sending message. I’ll describe a minimal implementation that makes use of Java Messaging Services (JMS) and the standard JMS message types and then follow on with an example using a custom message type that maps to a Oracle user defined type, allowing for more “object oriented” messages.

http://en.wikipedia.org/wiki/Liquid_handling_robot

Written by Rajarshi Guha

July 11th, 2010 at 8:31 pm

Posted in software

Tagged with , , , ,