In my previous post I discussed how we’d like to automate some of our screens – starting from the primary screen, going through data processing and compound selection and completing the secondary (follow up) screen. A key feature of such a workflow is the asynchronous nature of the individual steps. Messaging and Message queues (MQ) provide an excellent approach to handling this type of problem.
Message queue systems
A number of such MQ systems are available such as ActiveMQ, RabbitMQ and so on. See here for a comparison of different MQ systems. Given that we already use Oracle for our backend databases, we use Oracle Advanced Queue (AQ). One advantage of this is that we can store the messages in the database, allowing us to keep a history of a screen as well as use SQL queries to retrieve messages if desired. Such storage can obviously slows things down, but our message throughput is low enough that it doesn’t matter for us.
In this post I’ll briefly describe how I set up a queue on the database side and show the code for a Java application to send a message to the queue and retrieve a message from the queue. The example will actually use the JMS API, which Oracle AQ implements. As a result, the code can trivially swap out AQ for any other JMS implementation.
Creating queues & tables
The first step is to create a queue table and some queues in the database. The PL/SQL to generate these is
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | BEGIN DBMS_AQADM.create_queue_table( queue_table => 'test_qt', queue_payload_type => 'SYS.AQ$_JMS_MESSAGE'); DBMS_AQADM.create_queue( queue_table => 'test_qt', queue_name => 'input_q', retention_time => DBMS_AQADM.INFINITE); DBMS_AQADM.start_queue('input_q'); END; / quit |
So we’ve created a queue table called test_qt which will hold a queue called input_q. The plan is that we’ll have a process listening on this queue and processing each message as it comes and another process that will send a specified number of messages to the queue. The queue_payload_type argument to the create call, indicates that we can store any of the standard JMS message types (though we’ll be focusing on the text message type). We’ve also specified that for the input_q queue, messages will be retained in the database indefinitely. This is useful for debugging and auditing purposes.
Message producers & consumers
OK, with the queues set up, we can now write some Java code to send messages and receive them. In this example, the receiving code will actually run continuously, blocking until messages are received.
This example extends TimerTask. The strategy is that when the listener receives a message, it will create a new instance of this task and schedule it immediately on a new thread. As a result the message processing logic is contained within the run method. At this stage, we only consider messages that are of type TextMessage. If that’s the case we simply extract the payload of the message and print it to STDOUT.
You’ll note that we also create a unique listener ID and include that in the output. This is handy when we run multiple listeners and want to check that messages are being received by all of them.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | public class QueueExample extends TimerTask { static final String URL = "jdbc:oracle:thin:USER/PASSWD@HOST:PORT:SID"; private Message mesg; /* Useful to differentiate between multiple instances of the listener */ private static final String listenerID = UUID.randomUUID().toString(); static final String schema = "wtc"; static final String qTable = "test_qt"; static final String qName = "input_q"; static QueueConnection con = null; static QueueSession sess = null; static javax.jms.Queue q = null; public QueueExample(Message m) { mesg = m; } public void run() { try { if (!(mesg instanceof TextMessage)) return; String payload = ((TextMessage) mesg).getText(); System.out.println(listenerID + ": Got msg: " + payload); } catch (JMSException e) { e.printStackTrace(); } } |
Before looking at sending and receiving messages we need to initialize the connection to the message queue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | private static void initializeQueue() throws JMSException { QueueConnectionFactory queue = AQjmsFactory.getQueueConnectionFactory(URL, new Properties()); QueueConnection con = (QueueConnection) queue.createConnection(); con.start(); sess = (QueueSession) con.createSession(false, Session.AUTO_ACKNOWLEDGE); AQQueueTable qtab = ((AQjmsSession) sess).getQueueTable(schema, qTable); try { q = ((AQjmsSession) sess).getQueue(schema, qName); } catch (Exception ex) { AQjmsDestinationProperty props = new AQjmsDestinationProperty(); q = ((AQjmsSession) sess).createQueue(qtab, qName, props); } } |
The next step is to listen for messages and dispatch them for processing. The method below initializes the queue if it isn’t already initialized. After creating a consumer object, we simply wait for messages to come in. The receive method is blocking, so the program will wait for the next message. Once a message is received it creates an instance of this class and schedules it – when the thread starts, the run method will execute to process the message.
1 2 3 4 5 6 7 8 9 10 11 12 13 | public static void listener() throws JMSException { if (q == null) initializeQueue(); System.out.println(listenerID + ": Listening on queue " + q.getQueueName() + "..."); MessageConsumer consumer = sess.createConsumer(q); // each time we get a message, start up the message handler in a new thread for (Message m; (m = consumer.receive()) != null;) { new Timer().schedule(new QueueExample(m), 0); } sess.close(); con.close(); } |
The final component is to send messages. For this simple example, it’s primarily boiler plate code. In this case, we specify how many messages to send. The DeliveryMode.PERSISTENT indicates that the messages will be stored (in this case in the DB) until a consumer has received it. Note that after receipt by a consumer the message may or may not be stored in the database. See here for more details.
In the code below, we can set a variety of properties on the message. For example, we’ve set an “application id” (the JMSXAppID property) and a correlation id. Right now, we ignore this, but it can be used to link messages or even link a message to an external resource (though that could also be done via the payload itself). Another useful property that could be set is the message type via setJMSType. Using this one can assign a MIME type to a message allowing the message processing code to conditionally handle the message based on the type. For more details on the various properties that can be set see Message documentation.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | public static void sender(int n) throws JMSException { if (q == null) initializeQueue(); MessageProducer producer = sess.createProducer(q); producer.setDeliveryMode(DeliveryMode.PERSISTENT); Message msg; for (int i = 0; i < n; i++) { msg = sess.createTextMessage(); msg.setStringProperty("JMSXAppID", "QueueExample"); msg.setJMSCorrelationID(UUID.randomUUID().toString()); ((TextMessage) msg).setText("This is message number " + i); producer.send(msg); } producer.close(); sess.close(); } |
Running
The complete source code can be found here. To compile it you’ll need an OJDBC jar file as well as the following jar files (that come with the Oracle installation)
- $ORACLE_HOME/rdbms/jlib/aqapi.jar
- $ORACLE_HOME/rdbms/jlib/jmscommon.jar
- $ORACLE_HOME/jlib/jndi.jar
- $ORACLE_HOME/jlib/jta.jar
- $ORACLE_HOME/rdbms/jlib/xdb.jar
- $ORACLE_HOME/lib/xmlparserv2.jar
Once the code has been compiled to a jar file, we first start the listener:
1 2 | guhar$ java -jar dist/qex.jar listen 8b9fc2a2-533c-4426-a368-3e6ddfb41587: Listening on queue input_q... |
In another terminal we send some messages
1 | guhar$ java -jar dist/qex.jar send 5 |
Switching to the previous terminal we should see something like
1 2 3 4 5 | 8b9fc2a2-533c-4426-a368-3e6ddfb41587: Got msg: This is message number 0 8b9fc2a2-533c-4426-a368-3e6ddfb41587: Got msg: This is message number 1 8b9fc2a2-533c-4426-a368-3e6ddfb41587: Got msg: This is message number 2 8b9fc2a2-533c-4426-a368-3e6ddfb41587: Got msg: This is message number 3 8b9fc2a2-533c-4426-a368-3e6ddfb41587: Got msg: This is message number 4 |
The fun starts when we instantiate multiple listeners (possible on different machines). It’s simple enough to execute the first invocation above multiple times and watch the output as we send more messages. If you send 10 messages, you should see that some are handled by one listener and the remainder by another one and so on. if the actual message processing is compute intensive, this allows you to easily distribute such loads easily.
Next steps
The code discussed here is a minimalistic example of sending and receiving messages from a queue. In the next post, I’ll discuss how we can represent messages in the database using a custom message type (defined in terms of an Oracle ADT) and send and receive such messages using Java. Such custom message types allow the Java code to remain object oriented, with the AQ libraries handling serialization and deserialization of the messages between our code and the queue.
One of the downsides that I see with Oracle AQ is that the only clients supported are PL/SQL, C and Java. While AQ implements the JMS API, it employs its own wire protocol. The lack of support for AMQP means that a lot of client libraries in other languages cannot be used to send or retrieve messages from AQ. If anybody knows of Python packages that work with Oracle AQ I’d love to hear about them. (Looks like stomppy might support AQ?)