PyTPMOTW: py-amqplib

What’s This Module For?

To interact with a queue broker implementing version 0.8 of the Advanced Message Queueing Protocol (AMQP) standard. Copies of various versions of the specification can be found here. At time of writing, 0.10 is the latest version of the spec, but it seems that many popular implementations used in production environments today are still using 0.8, presumably awaiting a finalization of v.1.0 of the spec, which is a work in progress.

What is AMQP?

AMQP is a queuing/messaging protocol that is implemented by server daemons (called ‘brokers’) like RabbitMQ, ActiveMQ, Apache Qpid, Red Hat Enterprise MRG, and OpenAMQ. Though messaging protocols used in the enterprise are historically proprietary, AMQP has a bold and vocal stance that AMQP will be:

  • Broadly applicable for enterprise use
  • Totally open
  • Platform agnostic
  • Interoperable

The working group consists of several huge enterprises who have a vested interest in a protocol that meets these requirements. Most are either huge enterprises who are (or were) a victim of the proprietary lock-in that came with what will now likely become ‘legacy’ protocols, or implementers of the protocols, who will sell products and services around their implementation. Here’s a brief list of those involved in the AMQP working group:

  • JPMorgan Chase (the initial developers of the protocol, along with iMatix)
  • Goldman Sachs
  • Red Hat Software
  • Cisco Systems
  • Novell

Message brokers can facilitate an awfully large amount of flexibility in an architecture. They can be used to integrate applications across platforms and languages, enable asynchronous operations for web front ends, modularize and more easily distribute complex processing operations.

Basic Publishing

The first thing to know is that when you code against an AMQP broker, you’re dealing with a hierarchy: a ‘vhost’ contains one or more ‘exchanges’ which themselves can be bound to one or more ‘queues’. Here’s how you can programmatically create an exchange and queue, bind them together, and publish a message:

from amqplib import client_0_8 as amqp

conn = amqp.Connection(userid='guest', password='guest', host='localhost', virtual_host='/', ssl=False)

# Create a channel object, queue, exchange, and binding.
chan = conn.channel()
chan.queue_declare('myqueue', durable=True)
chan.exchange_declare('myexchange', type='direct', durable=True)
chan.queue_bind('myqueue', 'myexchange', routing_key='myq.myx')

# Create an AMQP message object

msg = amqp.Message('This is a test message')
chan.basic_publish(msg, 'myexchange', 'myq.myx')

As far as we know, we have one exchange and one queue on our server right now, and if that’s the case, then technically the routing key I’ve used isn’t required. However, I strongly suggest that you always use a routing key to avoid really odd (and implementation-specific) behavior like getting multiple copies of a message on the consumer side of the equation, or getting odd exceptions from the server. The routing key can be arbitrary text like I’ve used above, or you can use a common formula of using ‘.’ as your routing key. Just remember that without the routing key, the minute more than one queue is bound to an exchange, the exchange has no way of knowing which queue to route a message to. Remeber: you don’t publish to a queue, you publish to an exchange and tell it which queue it goes in via the routing key.

Basic Consumption

Now that we’ve published a message, how do we get our hands on it? There are two methods: basic_get, which will ‘get’ a single message from the queue, or ‘basic_consume’, which technically doesn’t get *any* messages: it registers a handler with the server and tells it to send messages along as they arrive, which is great for high-volume messaging operations.

Here’s the ‘basic_get’ version of a client to grab the message we just published:

msg = chan.basic_get(queue='myqueue', no_ack=False)
chan.basic_ack(msg.delivery_tag)

In the above, I’ve used the same channel I used to publish the message to get it back again using the basic_get operation. I then acknowledged receipt of the message by sending the server a ‘basic_ack’, passing along the delivery_tag the server included as part of the incoming message.

Consuming Mass Quantities

Using basic_consume takes a little more thought than basic_get, because basic_consume does nothing more than register a method with the server to tell it to start sending messages down the pipe. Once that’s done, however, it’s up to you to do a chan.wait() to wait for messages to show up, and find some elegant way of breaking out of this wait() operation. I’ve seen and used different techniques myself, and the right thing will depend on the application.

The basic_consume method also requires a callback method which is called for each incoming message, and is passed the amqp.Message object when it arrives.

Here’s a bit of code that defines a callback method, calls basic_consume, and does a chan.wait():

consumer_tag = 'foo'
def process(msg):
   txt = msg.body
   if '-1' in txt:
      print 'Got -1'
      chan.basic_cancel(consumer_tag)
      chan.close()
   else: 
      print 'Got message!'

chan.basic_consume('messages', callback=process, consumer_tag=consumer_tag)
while True:
   print 'Message processed. Next?'
   try:
      chan.wait()
   except IOError as out:
      print "Got an IOError: %s" % out
      break
   if not chan.is_open:
      print "Done processing. Later"
      break

So, basic_consume tells the server ‘Start sending any and all messages!’. The server registers a method with a name given by the consumer_tag argument, or it assigns one and it becomes the return value of basic_consume(). I define one here because I don’t want to run into race conditions where I want to call basic_cancel() with a consumer_tag variable that doesn’t exist yet, or is out of scope, or whatever. In the callback, I look for a sentinel message whose body contains ‘-1′, and at that point I call basic_cancel (passing in the consumer_tag so the server knows who to stop sending messages to), and I close the channel. In the ‘while True’, the channel object checks its status and exits if it’s not open.

The above example starts to uncover some issues with py-amqplib. It’s not clear how errors coming back from the server are handled, as opposed to errors caused by the processing code, for example. It’s also a little clumsy trying to determine the logic for breaking out of the loop. In this case there’s a sentinel message sent to the queue representing the final message on the stack, at which point our ‘process()’ callback closes the channel, but then the channel has to check its own status to move forward. Just returning False from process() doesn’t break out of the while loop, because it’s not looking for a return value from that function. We could have our process() function raise an error of its own as well, which might be a bit more elegant, if also a bit more work.

Moving Ahead

What I’ve covered here actually covers perhaps 90% of the common cases for amqplib, but there’s plenty more you can do with it. There are various exchange types, including fanout exchanges and topic exchanges, which can facilitate more interesting messaging and pub/sub models. To learn more about them, here are a couple of places to go for information:

Broadcasting your logs with RabbitMQ and Python
Rabbits and Warrens
RabbitMQ FAQ section “Messaging Concepts: Exchanges