"""
The :class:`rejected.testing.AsyncTestCase` provides a based class for the
easy creation of tests for your consumers. The test cases exposes multiple
methods to make it easy to setup a consumer and process messages. It is
build on top of :class:`tornado.testing.AsyncTestCase` which extends
:class:`unittest.TestCase`.
To get started, override the
:meth:`rejected.testing.AsyncTestCase.get_consumer` method.
Next, the :meth:`rejected.testing.AsyncTestCase.get_settings` method can be
overridden to define the settings that are passed into the consumer.
Finally, to invoke your Consumer as if it were receiving a message, the
:meth:`~rejected.testing.AsyncTestCase.process_message` method should be
invoked.
.. note:: Tests are asynchronous, so each test should be decorated with
:meth:`~rejected.testing.gen_test`.
Example
-------
The following example expects that when the message is processed by the
consumer, the consumer will raise a :exc:`~rejected.consumer.MessageException`.
.. code:: python
from rejected import consumer, testing
import my_package
class ConsumerTestCase(testing.AsyncTestCase):
def get_consumer(self):
return my_package.Consumer
def get_settings(self):
return {'remote_url': 'http://foo'}
@testing.gen_test
def test_consumer_raises_message_exception(self):
with self.assertRaises(consumer.MessageException):
yield self.process_message({'foo': 'bar'})
"""
import json
import time
import uuid
try:
from unittest import mock
except ImportError:
import mock
from helper import config
from pika import channel, spec
from pika.adapters import tornado_connection
from tornado import gen, ioloop, testing
try:
import raven
except ImportError:
raven = None
from . import consumer, data, process
gen_test = testing.gen_test
"""Testing equivalent of :func:`tornado.gen.coroutine`, to be applied to test
methods.
"""
[docs]class AsyncTestCase(testing.AsyncTestCase):
""":class:`tornado.testing.AsyncTestCase` subclass for testing
:class:`~rejected.consumer.Consumer` classes.
"""
_consumer = None
[docs] def setUp(self):
super(AsyncTestCase, self).setUp()
self.correlation_id = str(uuid.uuid4())
self.process = self._create_process()
self.consumer = self._create_consumer()
self.channel = self.process.connections['mock'].channel
[docs] def tearDown(self):
super(AsyncTestCase, self).tearDown()
if not self.consumer._finished:
self.consumer.finish()
@property
def published_messages(self):
"""Return a list of :class:`~rejected.testing.PublishedMessage`
that are extracted from all calls to
:meth:`~pika.channel.Channel.basic_publish` that are invoked during the
test. The properties attribute is the
:class:`pika.spec.BasicProperties`
instance that was created during publishing.
.. versionadded:: 3.18.9
:returns: list([:class:`~rejected.testing.PublishedMessage`])
"""
return [
PublishedMessage(
body=c[2]['body'],
exchange=c[2]['exchange'],
properties=c[2]['properties'],
routing_key=c[2]['routing_key'])
for c in self.channel.basic_publish.mock_calls
]
[docs] def get_consumer(self):
"""Override to return the consumer class for testing.
:rtype: :class:`rejected.consumer.Consumer`
"""
return consumer.Consumer
[docs] def get_settings(self):
"""Override this method to provide settings to the consumer during
construction. These settings should be from the `config` stanza
of the Consumer configuration.
:rtype: dict
"""
return {}
[docs] def create_message(self, message, properties=None,
exchange='rejected', routing_key='test'):
"""Create a message instance for use with the consumer in testing.
:param any message: the body of the message to create
:param dict properties: AMQP message properties
:param str exchange: The exchange the message should appear to be from
:param str routing_key: The message's routing key
:rtype: :class:`rejected.data.Message`
"""
if not properties:
properties = {}
if isinstance(message, dict) and \
properties.get('content_type') == 'application/json':
message = json.dumps(message)
return data.Message(
connection='mock',
channel=self.process.connections['mock'].channel,
method=spec.Basic.Deliver(
'ctag0', 1, False, exchange, routing_key),
properties=spec.BasicProperties(
app_id=properties.get('app_id', 'rejected.testing'),
content_encoding=properties.get('content_encoding'),
content_type=properties.get('content_type'),
correlation_id=properties.get(
'correlation_id', self.correlation_id),
delivery_mode=properties.get('delivery_mode', 1),
expiration=properties.get('expiration'),
headers=properties.get('headers'),
message_id=properties.get('message_id', str(uuid.uuid4())),
priority=properties.get('priority'),
reply_to=properties.get('reply_to'),
timestamp=properties.get('timestamp', int(time.time())),
type=properties.get('type'),
user_id=properties.get('user_id')
), body=message, returned=False)
@property
def measurement(self):
"""Return the :py:class:`rejected.data.Measurement` for the currently
assigned measurement object to the consumer.
:rtype: :class:`rejected.data.Measurement`
"""
return self.consumer._measurement
[docs] @gen.coroutine
def process_message(self,
message_body=None,
content_type='application/json',
message_type=None,
properties=None,
exchange='rejected',
routing_key='routing-key'):
"""Process a message as if it were being delivered by RabbitMQ. When
invoked, an AMQP message will be locally created and passed into the
consumer. With using the default values for the method, if you pass in
a JSON serializable object, the message body will automatically be
JSON serialized.
If an exception is not raised, a :class:`~rejected.data.Measurement`
instance is returned that will contain all of the measurements
collected during the processing of the message.
Example:
.. code:: python
class ConsumerTestCase(testing.AsyncTestCase):
@testing.gen_test
def test_consumer_raises_message_exception(self):
with self.assertRaises(consumer.MessageException):
result = yield self.process_message({'foo': 'bar'})
.. note:: This method is a co-routine and must be yielded to ensure
that your tests are functioning properly.
:param any message_body: the body of the message to create
:param str content_type: The mime type
:param str message_type: identifies the type of message to create
:param dict properties: AMQP message properties
:param str exchange: The exchange the message should appear to be from
:param str routing_key: The message's routing key
:raises: :exc:`rejected.consumer.ConsumerException`
:raises: :exc:`rejected.consumer.MessageException`
:raises: :exc:`rejected.consumer.ProcessingException`
:rtype: :class:`rejected.data.Measurement`
"""
properties = properties or {}
properties.setdefault('content_type', content_type)
properties.setdefault('correlation_id', self.correlation_id)
properties.setdefault('timestamp', int(time.time()))
properties.setdefault('type', message_type)
measurement = data.Measurement()
result = yield self.consumer.execute(
self.create_message(message_body, properties,
exchange, routing_key),
measurement)
if result == data.CONSUMER_EXCEPTION:
raise consumer.ConsumerException()
elif result == data.MESSAGE_EXCEPTION:
raise consumer.MessageException()
elif result == data.PROCESSING_EXCEPTION:
raise consumer.ProcessingException()
elif result == data.UNHANDLED_EXCEPTION:
raise AssertionError('UNHANDLED_EXCEPTION')
raise gen.Return(measurement)
@staticmethod
def _create_channel():
return mock.Mock(spec=channel.Channel)
def _create_connection(self):
obj = mock.Mock(spec=tornado_connection.TornadoConnection)
obj.ioloop = ioloop.IOLoop.current()
obj.channel = self._create_channel()
obj.channel.connection = obj
return obj
def _create_consumer(self):
"""Creates the per-test instance of the consumer that is going to be
tested.
:rtype: rejected.consumer.Consumer
"""
cls = self.get_consumer()
obj = cls(config.Data(self.get_settings()), self.process)
obj._message = self.create_message('dummy')
obj.set_channel('mock', self.process.connections['mock'].channel)
return obj
def _create_process(self):
obj = mock.Mock(spec=process.Process)
obj.connections = {'mock': self._create_connection()}
obj.sentry_client = mock.Mock(spec=raven.Client) if raven else None
return obj
[docs]class PublishedMessage(object):
"""Contains information about messages published during a test when
using :class:`rejected.testing.AsyncTestCase`.
:param str exchange: The exchange the message was published to
:param str routing_key: The routing key the message was published with
:param pika.spec.BasicProperties properties: AMQP message properties
:param bytes body: AMQP message body
.. versionadded:: 3.18.9
"""
__slots__ = ['exchange', 'routing_key', 'properties', 'body']
def __init__(self, exchange, routing_key, properties, body):
"""Create a new instance of the object.
:param str exchange: The exchange the message was published to
:param str routing_key: The routing key the message was published with
:param pika.spec.BasicProperties properties: AMQP message properties
:param bytes body: AMQP message body
"""
self.exchange = exchange
self.routing_key = routing_key
self.properties = properties
self.body = body
def __repr__(self):
"""Return the string representation of the object.
:rtype: str
"""
return '<PublishedMessage exchange="{}" routing_key="{}">'.format(
self.exchange, self.routing_key)