Consumer API

The Consumer, PublishingConsumer, SmartConsumer, and SmartPublishingConsumer provide base classes to extend for consumer applications.

While the Consumer class provides all the structure required for implementing a rejected consumer, the SmartConsumer adds functionality designed to make writing consumers even easier. When messages are received by consumers extending SmartConsumer, if the message’s content_type property contains one of the supported mime-types, the message body will automatically be deserialized, making the deserialized message body available via the body attribute. Additionally, should one of the supported content_encoding types (gzip or bzip2) be specified in the message’s property, it will automatically be decoded.

Message Type Validation

In any of the consumer base classes, if the MESSAGE_TYPE attribute is set, the type property of incoming messages will be validated against when a message is received, checking for string equality against the MESSAGE_TYPE attribute. If they are not matched, the consumer will not process the message and will drop the message without an exception if the DROP_INVALID_MESSAGES attribute is set to True. If it is False, a ConsumerException is raised.

Exceptions

There are two exception types that consumer applications should raise to handle problems that may arise when processing a message. When these exceptions are raised, rejected will reject the message delivery, letting RabbitMQ know that there was a failure.

The ConsumerException should be raised when there is a problem in the consumer itself, such as inability to contact a database server or other resources. When a ConsumerException is raised, the message will be rejected and requeued, adding it back to the RabbitMQ it was delivered back to. Additionally, rejected keeps track of consumer exceptions and will shutdown the consumer process and start a new one once a consumer has exceeded its configured maximum error count within a 60 second window. The default maximum error count is 5.

The MessageException should be raised when there is a problem with the message. When this exception is raised, the message will be rejected on the RabbitMQ server without requeue, discarding the message. This should be done when there is a problem with the message itself, such as a malformed payload or non-supported properties like content-type or type.

If a consumer raises a ProcessingException, the message that was being processed will be republished to the exchange specified by the ERROR_EXCHANGE attribute of the consumer's class using the routing key that was last used for the message. The original message body and properties will be used and an additional header X-Processing-Exceptions will be added that will contain the number of times the message has had a ProcessingException raised for it. In combination with a queue that has x-message-ttl set and x-dead-letter-exchange that points to the original exchange for the queue the consumer is consuming off of, you can implement a delayed retry cycle for messages that are failing to process due to external resource or service issues.

If ERROR_MAX_RETRY is set on the class, the headers for each method will be inspected and if the value of X-Processing-Exceptions is greater than or equal to the ERROR_MAX_RETRY value, the message will be dropped.

Note

If unhandled exceptions are raised by a consumer, they will be caught by rejected, logged, and turned into a ConsumerException.

class rejected.consumer.ConsumerException[source]

May be called when processing a message to indicate a problem that the Consumer may be experiencing that should cause it to stop.

class rejected.consumer.MessageException[source]

Invoke when a message should be rejected and not requeued, but not due to a processing error that should cause the consumer to stop.

class rejected.consumer.ProcessingException[source]

Invoke when a message should be rejected and not requeued, but not due to a processing error that should cause the consumer to stop. This should be used for when you want to reject a message which will be republished to a retry queue, without anything being stated about the exception.