RabbitMQ Gotchas

When you’re running a cloud service like we do here at Vena, uptime is very important. We run blue-green deployments and we strive to make all database schema changes backwards compatible, so server upgrades require no downtime and rollbacks are equally painless. The end result is that our users never even know when we’ve done an upgrade.

One of the features that requires special handling is background jobs. In the Vena Cloud app, we’ve productized the ability for end users to start potentially long-running data-crunching jobs and get notified when they finish. These jobs run as background threads on our servers. At some point, the servers running these jobs will need to shut down, for example during an upgrade. We don’t want to keep the old generation of servers running longer than necessary — it’s a waste of money and resources — so we transition the jobs to the new servers during the deployment process (in some cases, the jobs may even see a performance boost thanks to the new servers/code!). Transitioning the jobs is done by having the old server stop its work, create a resume point, and then we pass a message to a new server to continue from that point. Since a “new” server in one deployment will become the “old” server in the next, a flag is set to designate whether a server should receive messages or not.

Simple, right? Well, it turns out even the simplest designs can have problems if you don’t consider all the edge cases. We chose to use RabbitMQ to pass the messages, which is a widely-used implementation of the AMQP protocol. Unfortunately, despite our best efforts and testing in our staging environment, soon after we deployed this solution, we started seeing weird behaviour with jobs randomly not being resumed.

An Unexpected Delivery

Consider the following Java code for receiving messages from a RabbitMQ work queue:

while (!Thread.interrupted()) {  
    try {
        if (!connected) connect();

        synchronized(lock) {
            while(!acceptMessages) {
                lock.wait();
            }
        }
        // Wait for a message
        Delivery delivery = consumer.nextDelivery(TIMEOUT);

        handle(delivery);
    }
    catch (Exception e) {
        // error handling
    }
}
disconnect();  

This code is meant to run forever unless it gets interrupted. It connects to the RabbitMQ channel once at the beginning as a consumer, reconnects if there was an error or connection loss, but generally re-uses the same connection/channel/consumer as long as it exists.

Let’s say we have four servers running this code. Two of the servers (S1 and S2) have the volatile flag acceptMessages set to true, and the other two (S3 and S4) have it set to false. In other words, S1 and S2 are waiting for messages in consumer.nextDelivery(), while S3 and S4 are waiting for acceptMessages to become true in lock.wait().

Pop Quiz! What happens when 8 new messages are pushed onto the queue? (Choose one:)

  1. the messages are received by S1 and S2 as they request them, in first-come first-serve (FCFS) order.
  2. the messages are distributed evenly between S1 and S2 in round-robin order.
  3. the messages are distributed evenly among all four servers in round-robin order.
  4. the first two messages are distributed to S1 and S2 and then nothing else because S3 is not available to receive the 3rd message.
  5. the queue crashes.
  6. none of the above.

If you said (a), you would be wrong. RabbitMQ distributes queue messages in round-robin order. If you said (b), you would also be wrong. In fact, RabbitMQ distributes messages among all the connected consumers, even if those consumers have not explicitly called nextDelivery(). Thus, the answer is (c). A quote from the RabbitMQ tutorial:

[By default,] RabbitMQ just dispatches a message when the message enters the queue. It doesn’t look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.

In this scenario, S3 and S4 have not disconnected from the queue, and hence are in line to receive half of the messages. Servers S1 and S2 will receive messages 1, 2, 5, 6 and process them, but messages 3, 4, 7, 8 sent to S3 and S4 are stuck in limbo and never processed by anyone.

“But, wait! RabbitMQ is a reliable and robust queue,” you say. “Messages should never be lost!” It turns out our second mistake was leaving on auto-acknowledgement instead of acknowledging messages manually. With auto-ack on, even after we disconnect and shut down S3 and S4, their messages are never re-delivered — they are lost forever!

What Should We Do?

Fortunately, the fixes here are simple. First, when a consumer wants to stop listening to the queue, it needs to disconnect from the queue. Second, we should never use auto-ack for a work queue.

The first change is the ideal fix, as it notifies RabbitMQ immediately to stop dispatching messages to the consumer. However, in the cases where a server hangs or doesn’t shut down cleanly or a network connection is severed, this might not be possible. The second change makes sure that if this happens, the message that RabbitMQ originally dispatched to a dead consumer will get re-delivered to a live one once it realizes that the original one is dead (i.e. did not ack).

Our final solution looks something like this:

while (!Thread.interrupted()) {  
    try {
        synchronized(lock) {
            while(!acceptMessages) {
                disconnect();
                lock.wait();
            }
        }
        if (!connected) connect();

        // Wait for a message
        Delivery delivery = consumer.nextDelivery(TIMEOUT);

        if (delivery != null) {
            try {
                handle(delivery);
            } finally {
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
                    false);
            }
        }
    }
    catch (Exception e) {
        // error handling
    }
}
disconnect();  

A Final Note on Acking

We briefly considered if we should still ack the message if there was an exception thrown in our message handler. Theoretically, it could be possible for servers running different code to be listening to the same queue, and some of those servers may be able to handle a message that another couldn’t. We considered if the one that couldn’t handle the message should send a negative ack, to force RabbitMQ to re-queue and re-deliver the message later. The downside is that if no one can handle that message, then the message remains on the queue forever and you have a much bigger problem. There isn’t much literature on this subject, but we decided to err on the safe side and always ack, even if we hit an error.

Posted by Albert Choi

Senior Software Architect