Source code for rejected.testing

"""
The :class:`rejected.testing.AsyncTestCase` provides a based class for the
easy creation of tests for your consumers. The test cases exposes multiple
methods to make it easy to setup a consumer and process messages. It is
build on top of :class:`tornado.testing.AsyncTestCase` which extends
:class:`unittest.TestCase`.

To get started, override the
:meth:`rejected.testing.AsyncTestCase.get_consumer` method.

Next, the :meth:`rejected.testing.AsyncTestCase.get_settings` method can be
overridden to define the settings that are passed into the consumer.

Finally, to invoke your Consumer as if it were receiving a message, the
:meth:`~rejected.testing.AsyncTestCase.process_message` method should be
invoked.

.. note:: Tests are asynchronous, so each test should be decorated with
            :meth:`~rejected.testing.gen_test`.

Example
-------
The following example expects that when the message is processed by the
consumer, the consumer will raise a  :exc:`~rejected.consumer.MessageException`.

.. code:: python

    from rejected import consumer, testing

    import my_package


    class ConsumerTestCase(testing.AsyncTestCase):

        def get_consumer(self):
            return my_package.Consumer

        def get_settings(self):
            return {'remote_url': 'http://foo'}

        @testing.gen_test
        def test_consumer_raises_message_exception(self):
            with self.assertRaises(consumer.MessageException):
                yield self.process_message({'foo': 'bar'})

"""
import json
import time
import uuid

from helper import config
import mock
from pika import spec
from tornado import gen, testing

from rejected import consumer, data

gen_test = testing.gen_test
"""Testing equivalent of ``@gen.coroutine``, to be applied to test methods."""


[docs]class AsyncTestCase(testing.AsyncTestCase): """:class:`~unittest.TestCase` subclass for testing :class:`~tornado.ioloop.IOLoop`-based asynchronous code. """ _consumer = None def setUp(self): super(AsyncTestCase, self).setUp() self.correlation_id = str(uuid.uuid4()) self.consumer = self._create_consumer() def tearDown(self): super(AsyncTestCase, self).tearDown() if not self.consumer._finished: self.consumer.finish()
[docs] def get_consumer(self): """Override to return the consumer class for testing. :rtype: class """ return consumer.Consumer
[docs] def get_settings(self): """Override this method to provide settings to the consumer during construction. :return: """ return {}
@gen.coroutine
[docs] def process_message(self, message_body, content_type='application/json', message_type=None, properties=None, exchange='rejected', routing_key='routing-key'): """Process a message as if it were being delivered by RabbitMQ. When invoked, an AMQP message will be locally created and passed into the consumer. With using the default values for the method, if you pass in a JSON serializable object, the message body will automatically be JSON serialized. :param any message_body: the body of the message to create :param str content_type: The mime type :param str message_type: identifies the type of message to create :param dict properties: AMQP message properties :param str exchange: The exchange the message should appear to be from :param str routing_key: The message's routing key :raises: rejected.consumer.ConsumerException :raises: rejected.consumer.MessageException :raises: rejected.consumer.ProcessingException :returns: bool """ properties = properties or {} properties.setdefault('content_type', content_type) properties.setdefault('type', message_type) result = yield self.consumer._execute( self._create_message(message_body, properties, exchange, routing_key), data.Measurement()) if result == data.MESSAGE_ACK: raise gen.Return(True) elif result == data.CONSUMER_EXCEPTION: raise consumer.ConsumerException() elif result == data.MESSAGE_EXCEPTION: raise consumer.MessageException() elif result == data.PROCESSING_EXCEPTION: raise consumer.ProcessingException()
def _create_consumer(self): """Creates the per-test instance of the consumer that is going to be tested. :rtype: rejected.consumer.Consumer """ cls = self.get_consumer() settings = config.Data(self.get_settings()) obj = cls(settings, mock.Mock('rejected.process.Process')) obj._process = mock.Mock() obj._process.send_exception_to_sentry = mock.Mock() obj.initialize() obj._channel = mock.Mock('pika.channel.Channel') obj._channel.basic_publish = mock.Mock() obj._correlation_id = self.correlation_id return obj def _create_message(self, message, properties, exchange='rejected', routing_key='test'): """Create a message instance for the consumer. :param any message: the body of the message to create :param dict properties: AMQP message properties :param str exchange: The exchange the message should appear to be from :param str routing_key: The message's routing key :rtype: rejected.data.Message """ if isinstance(message, dict) and \ properties['content_type'] == 'application/json': message = json.dumps(message) return data.Message( channel=self.consumer._channel, method=spec.Basic.Deliver( 'ctag0', 1, False, exchange, routing_key), properties=spec.BasicProperties( app_id=properties.get('app_id', 'rejected.testing'), content_encoding=properties.get('content_encoding'), content_type=properties['content_type'], correlation_id=properties.get( 'correlation_id', self.correlation_id), delivery_mode=properties.get('delivery_mode', 1), expiration=properties.get('expiration'), headers=properties.get('headers'), message_id=properties.get('message_id', str(uuid.uuid4())), priority=properties.get('priority'), reply_to=properties.get('reply_to'), timestamp=properties.get('timestamp', int(time.time())), type=properties['type'], user_id=properties.get('user_id') ), body=message)