Testing API

Rejected’s testing API extends Tornado’s testing module API to make writing consumer application tests easy to write without having to create your own mocks for Rejected’s consumer application contracts.


All of your test methods when using the functions in AsyncTestCase need to use the @testing.gen_test decorator.


rejected.testing namespaced import of Tornado’s @tornado.testing.gen_test decorator.


from rejected import testing

import my_consumer

class ConsumerTestCase(testing.AsyncTestCase):

   def get_consumer(self):
       return my_consumer.Consumer

   def test_consumer_raises_message_exception(self):
       result = yield self.process_message({'foo': 'bar'})

Base Test Case

Extend AsyncTestCase to implement your consumer tests.

class rejected.testing.AsyncTestCase(*args, **kwargs)[source]

tornado.testing.AsyncTestCase subclass for testing Consumer classes. tornado.testing.AsyncTestCase is a subclass of unittest.TestCase


The unittest framework is synchronous, so the test must be complete by the time the test method returns. This means that asynchronous code cannot be used in quite the same way as usual.

To write test functions that use the same yield-based patterns used with the tornado.gen module, decorate your test methods with @rejected.testing.gen_test instead of @tornado.gen.coroutine. This class also provides the stop and wait methods for a more manual style of testing. The test method itself must call self.wait(), and asynchronous callbacks should call self.stop() to signal completion.

By default, a new IOLoop is constructed for each test and is available as self.io_loop. This IOLoop should be used in the construction of HTTP clients/servers, etc. If the code being tested requires a global IOLoop, subclasses should override get_new_ioloop to return it.

The IOLoop’s start and stop methods should not be called directly. Instead, use self.stop and self.wait. Arguments passed to self.stop are returned from self.wait. It is possible to have multiple wait/stop cycles in the same test.

Consumer Testing Functions


Override to return the consumer class for testing.

Return type:class

Override this method to provide settings to the consumer during construction. These settings should be from the config stanza of the Consumer configuration.

Return type:dict
create_message(message, properties=None, exchange='rejected', routing_key='test')[source]

Create a message instance for use with the consumer in testing.

  • message (any) – the body of the message to create
  • properties (dict) – AMQP message properties
  • exchange (str) – The exchange the message should appear to be from
  • routing_key (str) – The message’s routing key
Return type:


process_message(message_body=None, content_type='application/json', message_type=None, properties=None, exchange='rejected', routing_key='routing-key')[source]

Process a message as if it were being delivered by RabbitMQ. When invoked, an AMQP message will be locally created and passed into the consumer. With using the default values for the method, if you pass in a JSON serializable object, the message body will automatically be JSON serialized.

If an exception is not raised, a Measurement instance is returned that will contain all of the measurements collected during the processing of the message.


class ConsumerTestCase(testing.AsyncTestCase):

    def test_consumer_raises_message_exception(self):
        with self.assertRaises(consumer.MessageException):
            yield self.process_message({'foo': 'bar'})


This method is a co-routine and must be yielded to ensure that your tests are functioning properly.

  • message_body (any) – the body of the message to create
  • content_type (str) – The mime type
  • message_type (str) – identifies the type of message to create
  • properties (dict) – AMQP message properties
  • exchange (str) – The exchange the message should appear to be from
  • routing_key (str) – The message’s routing key

AssertionError when an unhandled exception is raised







Return type:



Return a list of PublishedMessage that are extracted from all calls to basic_publish that are invoked during the test. The properties attribute is the pika.spec.BasicProperties instance that was created during publishing.

New in version 3.18.9.


Assign a callable (lambda function, method, etc) to invoke for each published message. Will be invoked with 4 arguments: Exchange (str), Routing Key (str), AMQP Message Properties (pika.spec.BasicProperties), and Body (bytes).

Raise a rejected.testing.UnroutableMessage exception to trigger the message to be returned to the Consumer as unroutable.

Raise a rejected.testing.UndeliveredMessage exception to trigger an undelivered confirmation message when publisher confirmations are enabled.

New in version 4.0.0.

Parameters:func (callable) – The function / method to execute

Tornado AsyncTestCase Specific Functions


Creates a new .IOLoop for this test. May be overridden in subclasses for tests that require a specific .IOLoop (usually the singleton .IOLoop.instance()).

stop(_arg=None, **kwargs)[source]

Stops the .IOLoop, causing one pending (or future) call to wait() to return.

Keyword arguments or a single positional argument passed to stop() are saved and will be returned by wait().

wait(condition=None, timeout=None)[source]

Runs the .IOLoop until stop is called or timeout has passed.

In the event of a timeout, an exception will be thrown. The default timeout is 5 seconds; it may be overridden with a timeout keyword argument or globally with the ASYNC_TEST_TIMEOUT environment variable.

If condition is not None, the .IOLoop will be restarted after stop() until condition() returns true.

Changed in version 3.1: Added the ASYNC_TEST_TIMEOUT environment variable.

Test Setup and Tear down


Method called to prepare the test fixture. This is called immediately before calling the test method; other than AssertionError or unittest.SkipTest, any exception raised by this method will be considered an error rather than a test failure.


If you extend this method, you MUST invoke super(YourAsyncTestCase, self).setUp() to properly setup the test case.


Method called immediately after the test method has been called and the result recorded. This is called even if the test method raised an exception, so the implementation in subclasses may need to be particularly careful about checking internal state. Any exception, other than AssertionError or unittest.SkipTest, raised by this method will be considered an additional error rather than a test failure (thus increasing the total number of reported errors).

This method will only be called if the setUp succeeds, regardless of the outcome of the test method.


If you extend this method, you MUST invoke super(YourAsyncTestCase, self).tearDown() to properly tear down the test case.


For additional methods available in this class, see the unittest.TestCase documentation.

Test Results

To test that the messages your consumer may be publishing are correct, any calls to Consumer.publish_message are recorded to AsyncTestCase.published_messages as a list of PublishedMessage objects.

class rejected.testing.PublishedMessage(exchange, routing_key, properties, body, delivered=None)[source]

Contains information about messages published during a test when using rejected.testing.AsyncTestCase.

  • exchange (str) – The exchange the message was published to
  • routing_key (str) – The routing key the message was published with
  • properties (pika.spec.BasicProperties) – AMQP message properties
  • body (bytes) – AMQP message body
  • delivered (bool) – Indicates if the message was delivered when Publisher Confirmations are enabled

New in version 3.18.9.


The following exceptions are available for use in testing via the publishing_side_effect method:

class rejected.testing.UndeliveredMessage[source]

Raise as a side effect of rejected.testing.AsyncTestCase with publishing_side_effect to test negative acknowledgements when using publisher confirmations.

class rejected.testing.UnroutableMessage[source]

Raise as a side effect of rejected.testing.AsyncTestCase with publishing_side_effect to test branches dealing with messages returned by RabbitMQ as unroutable when using publisher confirmations.


Tornado’s tornado.gen module is imported into the rejected.testing module as rejected.testing.gen for your convenience.

Methods of note are:


Return a .Future that resolves after the given number of seconds.

When used with yield in a coroutine, this is a non-blocking analogue to time.sleep (which should not be used in coroutines because it is blocking):

yield gen.sleep(0.5)

Note that calling this function on its own does nothing; you must wait on the .Future it returns (usually by yielding it).

New in version 4.1.


For additional methods available in this namespace, see the tornado.gen documentation.