SmartConsumer¶
-
class
rejected.consumer.
SmartConsumer
(settings, process)[source]¶ Base class to ease the implementation of strongly typed message consumers that validate and automatically decode and deserialize the inbound message body based upon the message properties. Additionally, should one of the supported
content_encoding
types (gzip
orbzip2
) be specified in the message’s property, it will automatically be decoded.Supported MIME types for automatic deserialization are:
- application/json
- application/pickle
- application/x-pickle
- application/x-plist
- application/x-vnd.python.pickle
- application/vnd.python.pickle
- text/csv
- text/html (with beautifulsoup4 installed)
- text/xml (with beautifulsoup4 installed)
- text/yaml
- text/x-yaml
In any of the consumer base classes, if the
MESSAGE_TYPE
attribute is set, thetype
property of incoming messages will be validated against when a message is received, checking for string equality against theMESSAGE_TYPE
attribute. If they are not matched, the consumer will not process the message and will drop the message without an exception if theDROP_INVALID_MESSAGES
attribute is set toTrue
. If it isFalse
, aConsumerException
is raised.-
app_id
¶ Access the current message’s
app-id
property as an attribute of the consumer class.Return type: str
-
body
¶ Return the message body, unencoded if needed, deserialized if possible.
Return type: any
-
configuration
¶ Access the configuration stanza for the consumer as specified by the
config
section for the consumer in the rejected configuration.Deprecated since version 3.1: Use
settings
instead.Return type: dict
-
content_encoding
¶ Access the current message’s
content-encoding
property as an attribute of the consumer class.Return type: str
-
content_type
¶ Access the current message’s
content-type
property as an attribute of the consumer class. :rtype: str
-
correlation_id
¶ Access the current message’s
correlation-id
property as an attribute of the consumer class.Return type: str
-
exchange
¶ Access the exchange the message was published to as an attribute of the consumer class.
Return type: str
-
expiration
¶ Access the current message’s
expiration
property as an attribute of the consumer class.Return type: str
-
finish
()¶ Finishes message processing for the current message.
-
headers
¶ Access the current message’s
headers
property as an attribute of the consumer class.Return type: dict
-
initialize
()¶ Extend this method for any initialization tasks that occur only when the Consumer class is created.
-
log_exception
(msg_format, *args, **kwargs)¶ Customize the logging of uncaught exceptions.
Parameters: By default, this method will log the message using
logging.Logger.error()
and send the exception to Sentry. If an exception is currently active, then the traceback will be logged at the debug level.
-
message_id
¶ Access the current message’s
message-id
property as an attribute of the consumer class. :rtype: str
-
message_type
¶ Access the current message’s
type
property as an attribute of the consumer class.Return type: str
-
on_finish
()¶ Called after the end of a request. Override this method to perform cleanup, logging, etc. This method is a counterpart to prepare.
on_finish
may not produce any output, as it is called after the response has been sent to the client.
-
prepare
()¶ Called when a message is received before process.
Asynchronous support: Decorate this method with .gen.coroutine or .return_future to make it asynchronous (the asynchronous decorator cannot be used on prepare).
If this method returns a .Future execution will not proceed until the .Future is done.
-
priority
¶ Access the current message’s
priority
property as an attribute of the consumer class.Return type: int
-
process
()¶ Extend this method for implementing your Consumer logic.
If the message can not be processed and the Consumer should stop after n failures to process messages, raise the ConsumerException.
Raises: ConsumerException Raises: NotImplementedError
-
properties
¶ Access the current message’s properties in dict form as an attribute of the consumer class.
Return type: dict
-
reply_to
¶ Access the current message’s
reply-to
property as an attribute of the consumer class.Return type: str
-
require_setting
(name, feature='this feature')¶ Raises an exception if the given app setting is not defined.
Parameters:
-
send_exception_to_sentry
(exc_info)¶ Send an exception to Sentry if enabled.
Parameters: exc_info (tuple) – exception information as returned from sys.exc_info()
-
sentry_client
¶ Access the Sentry raven
Client
instance orNone
Use this object to add tags or additional context to Sentry error reports (see
raven.base.Client.tags_context()
) or to report messages (viaraven.base.Client.captureMessage()
) directly to Sentry.Return type: raven.base.Client
-
set_sentry_context
(tag, value)¶ Set a context tag in Sentry for the given key and value.
Parameters:
-
settings
¶ Access the consumer settings as specified by the
config
section for the consumer in the rejected configuration.Return type: dict
-
shutdown
()¶ Override to cleanly shutdown when rejected is stopping
-
statsd_add_timing
(key, duration)¶ Add a timing to statsd
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
-
statsd_incr
(key, value=1)¶ Increment the specified key in statsd if statsd is enabled.
Parameters:
-
statsd_track_duration
(key)¶ Time around a context and emit a statsd metric.
Parameters: key (str) – The key for the timing to track
-
timestamp
¶ Access the unix epoch timestamp value from the properties of the current message.
Return type: int
-
unset_sentry_context
(tag)¶ Remove a context tag from sentry
Parameters: tag (str) – The context tag to remove
-
yield_to_ioloop
()¶ Function that will allow Rejected to process IOLoop events while in a tight-loop inside an asynchronous consumer.