Consumer¶
-
class
rejected.consumer.
Consumer
(settings, process)[source]¶ Base consumer class that defines the contract between rejected and consumer applications.
In any of the consumer base classes, if the
MESSAGE_TYPE
attribute is set, thetype
property of incoming messages will be validated against when a message is received, checking for string equality against theMESSAGE_TYPE
attribute. If they are not matched, the consumer will not process the message and will drop the message without an exception if theDROP_INVALID_MESSAGES
attribute is set toTrue
. If it isFalse
, aMessageException
is raised.If a consumer raises a
ProcessingException
, the message that was being processed will be republished to the exchange specified by theERROR_EXCHANGE
attribute of the consumer’s class using the routing key that was last used for the message. The original message body and properties will be used and an additional headerX-Processing-Exceptions
will be added that will contain the number of times the message has had aProcessingException
raised for it. In combination with a queue that hasx-message-ttl
set andx-dead-letter-exchange
that points to the original exchange for the queue the consumer is consuming off of, you can implement a delayed retry cycle for messages that are failing to process due to external resource or service issues.If
ERROR_MAX_RETRY
is set on the class, the headers for each method will be inspected and if the value ofX-Processing-Exceptions
is greater than or equal to theERROR_MAX_RETRY
value, the message will be dropped.-
DROP_INVALID_MESSAGES
= False¶ Drop a message if its type property doesn’t match
MESSAGE_TYPE
-
ERROR_EXCHANGE
= 'errors'¶ The exchange to publish ProcessingErrors to
-
MESSAGE_TYPE
= None¶ Used to validate the message type of a message before processing
-
app_id
¶ Access the current message’s
app-id
property as an attribute of the consumer class.Return type: str
-
configuration
¶ Access the configuration stanza for the consumer as specified by the
config
section for the consumer in the rejected configuration.Deprecated since version 3.1: Use
settings
instead.Return type: dict
-
content_encoding
¶ Access the current message’s
content-encoding
property as an attribute of the consumer class.Return type: str
-
content_type
¶ Access the current message’s
content-type
property as an attribute of the consumer class. :rtype: str
-
correlation_id
¶ Access the current message’s
correlation-id
property as an attribute of the consumer class.Return type: str
-
exchange
¶ Access the exchange the message was published to as an attribute of the consumer class.
Return type: str
-
expiration
¶ Access the current message’s
expiration
property as an attribute of the consumer class.Return type: str
-
headers
¶ Access the current message’s
headers
property as an attribute of the consumer class.Return type: dict
-
initialize
()[source]¶ Extend this method for any initialization tasks that occur only when the Consumer class is created.
-
log_exception
(msg_format, *args, **kwargs)[source]¶ Customize the logging of uncaught exceptions.
Parameters: By default, this method will log the message using
logging.Logger.error()
and send the exception to Sentry. If an exception is currently active, then the traceback will be logged at the debug level.
-
message_id
¶ Access the current message’s
message-id
property as an attribute of the consumer class. :rtype: str
-
message_type
¶ Access the current message’s
type
property as an attribute of the consumer class.Return type: str
-
on_finish
()[source]¶ Called after the end of a request. Override this method to perform cleanup, logging, etc. This method is a counterpart to prepare.
on_finish
may not produce any output, as it is called after the response has been sent to the client.
-
prepare
()[source]¶ Called when a message is received before process.
Asynchronous support: Decorate this method with .gen.coroutine or .return_future to make it asynchronous (the asynchronous decorator cannot be used on prepare).
If this method returns a .Future execution will not proceed until the .Future is done.
-
priority
¶ Access the current message’s
priority
property as an attribute of the consumer class.Return type: int
-
process
()[source]¶ Extend this method for implementing your Consumer logic.
If the message can not be processed and the Consumer should stop after n failures to process messages, raise the ConsumerException.
Raises: ConsumerException Raises: NotImplementedError
-
properties
¶ Access the current message’s properties in dict form as an attribute of the consumer class.
Return type: dict
-
reply_to
¶ Access the current message’s
reply-to
property as an attribute of the consumer class.Return type: str
-
require_setting
(name, feature='this feature')[source]¶ Raises an exception if the given app setting is not defined.
Parameters:
-
send_exception_to_sentry
(exc_info)[source]¶ Send an exception to Sentry if enabled.
Parameters: exc_info (tuple) – exception information as returned from sys.exc_info()
-
sentry_client
¶ Access the Sentry raven
Client
instance orNone
Use this object to add tags or additional context to Sentry error reports (see
raven.base.Client.tags_context()
) or to report messages (viaraven.base.Client.captureMessage()
) directly to Sentry.Return type: raven.base.Client
-
set_sentry_context
(tag, value)[source]¶ Set a context tag in Sentry for the given key and value.
Parameters:
-
settings
¶ Access the consumer settings as specified by the
config
section for the consumer in the rejected configuration.Return type: dict
-
statsd_add_timing
(key, duration)[source]¶ Add a timing to statsd
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
-
statsd_incr
(key, value=1)[source]¶ Increment the specified key in statsd if statsd is enabled.
Parameters:
-
statsd_track_duration
(key)[source]¶ Time around a context and emit a statsd metric.
Parameters: key (str) – The key for the timing to track
-
timestamp
¶ Access the unix epoch timestamp value from the properties of the current message.
Return type: int
-