Consumer Examples

The following example illustrates a very simple consumer that simply logs each message body as it’s received.

from rejected import consumer
import logging

__version__ = '1.0.0'

LOGGER = logging.getLogger(__name__)


class ExampleConsumer(consumer.Consumer):

    def process(self):
        LOGGER.info(self.body)

All interaction with RabbitMQ with regard to connection management and message handling, including acknowledgements and rejections are automatically handled for you.

The __version__ variable provides context in the rejected log files when consumers are started and can be useful for investigating consumer behaviors in production.

In this next example, a contrived ExampleConsumer._connect_to_database method is added that will return False. When ExampleConsumer.process evaluates if it could connect to the database and finds it can not, it will raise a rejected.consumer.ConsumerException which will requeue the message in RabbitMQ and increment an error counter. When too many errors occur, rejected will automatically restart the consumer after a brief quiet period. For more information on these exceptions, check out the consumer API documentation.

from rejected import consumer
import logging

__version__ = '1.0.0'

LOGGER = logging.getLogger(__name__)


class ExampleConsumer(consumer.Consumer):

    def _connect_to_database(self):
        return False

    def process(self):
        if not self._connect_to_database:
            raise consumer.ConsumerException('Database error')

        LOGGER.info(self.body)

Some consumers are also publishers. In this next example, the message body will be republished to a new exchange on the same RabbitMQ connection:

from rejected import consumer
import logging

__version__ = '1.0.0'

LOGGER = logging.getLogger(__name__)


class ExampleConsumer(consumer.PublishingConsumer):

    def process(self):
        LOGGER.info(self.body)
        self.publish('new-exchange', 'routing-key', {}, self.body)

Note that the previous example extends rejected.consumer.PublishingConsumer instead of rejected.consumer.Consumer. For more information about what base consumer classes exist, be sure to check out the consumer API documentation.