Consumer¶
-
class
rejected.consumer.
Consumer
(settings, process, drop_invalid_messages=None, message_type=None, error_exchange=None, error_max_retry=None)[source]¶ Base consumer class that defines the contract between rejected and consumer applications.
In any of the consumer base classes, if the
message_type
is specified in the configuration (or set with theMESSAGE_TYPE
attribute), thetype
property of incoming messages will be validated against when a message is received. If there is no match, the consumer will not process the message and will drop the message without an exception if thedrop_invalid_messages
setting is set toTrue
in the configuration (or if theDROP_INVALID_MESSAGES
attribute is set toTrue
). If it isFalse
, aMessageException
is raised.If a consumer raises a ~rejected.consumer.ProcessingException, the message that was being processed will be republished to the exchange specified by the
error
exchange configuration value or theERROR_EXCHANGE
attribute of the consumer’s class. The message will be published using the routing key that was last used for the message. The original message body and properties will be used and an additional headerX-Processing-Exceptions
will be added that will contain the number of times the message has had aProcessingException
raised for it. In combination with a queue that hasx-message-ttl
set andx-dead-letter-exchange
that points to the original exchange for the queue the consumer is consuming off of, you can implement a delayed retry cycle for messages that are failing to process due to external resource or service issues.If
error_max_retry
is specified in the configuration orERROR_MAX_RETRY
is set on the class, the headers for each method will be inspected and if the value ofX-Processing-Exceptions
is greater than or equal to the specified value, the message will be dropped.Parameters: - settings (dict) – The configuration from rejected
- process (rejected.process.Process) – The controlling process
- drop_invalid_messages (bool) – Drop a message if its type property doesn’t match the specified message type.
- message_type (str|list) – Used to validate the message type of a message before processing. This attribute can be set to a string that is matched against the AMQP message type or a list of acceptable message types.
- error_exchange (str) – The exchange to publish a message raising a
ProcessingException
to - error_max_retry (int) – The number of :exc:`~rejected.consumer.ProcessingException`s raised on a message before a message is dropped. If not specified, messages will never be dropped.
-
app_id
¶ Access the current message’s
app-id
property as an attribute of the consumer class.Return type: str
-
configuration
¶ Access the configuration stanza for the consumer as specified by the
config
section for the consumer in the rejected configuration.Deprecated since version 3.1: Use
settings
instead.Return type: dict
-
content_encoding
¶ Access the current message’s
content-encoding
property as an attribute of the consumer class.Return type: str
-
content_type
¶ Access the current message’s
content-type
property as an attribute of the consumer class. :rtype: str
-
correlation_id
¶ Access the current message’s
correlation-id
property as an attribute of the consumer class.Return type: str
-
exchange
¶ Access the exchange the message was published to as an attribute of the consumer class.
Return type: str
-
expiration
¶ Access the current message’s
expiration
property as an attribute of the consumer class.Return type: str
-
headers
¶ Access the current message’s
headers
property as an attribute of the consumer class.Return type: dict
-
initialize
()[source]¶ Extend this method for any initialization tasks that occur only when the Consumer class is created.
-
log_exception
(msg_format, *args, **kwargs)[source]¶ Customize the logging of uncaught exceptions.
Parameters: By default, this method will log the message using
logging.Logger.error()
and send the exception to Sentry. If an exception is currently active, then the traceback will be logged at the debug level.
-
message_id
¶ Access the current message’s
message-id
property as an attribute of the consumer class. :rtype: str
-
message_type
¶ Access the current message’s
type
property as an attribute of the consumer class.Return type: str
-
on_finish
()[source]¶ Called after the end of a request. Override this method to perform cleanup, logging, etc. This method is a counterpart to prepare.
on_finish
may not produce any output, as it is called after the response has been sent to the client.
-
prepare
()[source]¶ Called when a message is received before process.
Asynchronous support: Decorate this method with .gen.coroutine or .return_future to make it asynchronous (the asynchronous decorator cannot be used on prepare).
If this method returns a .Future execution will not proceed until the .Future is done.
-
priority
¶ Access the current message’s
priority
property as an attribute of the consumer class.Return type: int
-
process
()[source]¶ Extend this method for implementing your Consumer logic.
If the message can not be processed and the Consumer should stop after n failures to process messages, raise the ConsumerException.
Raises: ConsumerException Raises: NotImplementedError
-
properties
¶ Access the current message’s properties in dict form as an attribute of the consumer class.
Return type: dict
-
reply_to
¶ Access the current message’s
reply-to
property as an attribute of the consumer class.Return type: str
-
require_setting
(name, feature='this feature')[source]¶ Raises an exception if the given app setting is not defined.
Parameters:
-
send_exception_to_sentry
(exc_info)[source]¶ Send an exception to Sentry if enabled.
Parameters: exc_info (tuple) – exception information as returned from sys.exc_info()
-
sentry_client
¶ Access the Sentry raven
Client
instance orNone
Use this object to add tags or additional context to Sentry error reports (see
raven.base.Client.tags_context()
) or to report messages (viaraven.base.Client.captureMessage()
) directly to Sentry.Return type: raven.base.Client
-
set_sentry_context
(tag, value)[source]¶ Set a context tag in Sentry for the given key and value.
Parameters:
-
settings
¶ Access the consumer settings as specified by the
config
section for the consumer in the rejected configuration.Return type: dict
-
stats_add_timing
(key, duration)[source]¶ Add a timing to the per-message measurements
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
-
stats_incr
(key, value=1)[source]¶ Increment the specified key in the per-message measurements
Parameters:
-
stats_set_tag
(key, value=1)[source]¶ Set the specified tag/value in the per-message measurements
Parameters:
-
stats_set_value
(key, value=1)[source]¶ Set the specified key/value in the per-message measurements
Parameters:
-
stats_track_duration
(*args, **kwds)[source]¶ Time around a context and add to the the per-message measurements
Parameters: key (str) – The key for the timing to track
-
statsd_add_timing
(key, duration)[source]¶ Add a timing to the per-message measurements
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
Deprecated since version 3.13.0.
-
statsd_incr
(key, value=1)[source]¶ Increment the specified key in the per-message measurements
Parameters: Deprecated since version 3.13.0.
-
statsd_track_duration
(key)[source]¶ Time around a context and add to the the per-message measurements
Parameters: key (str) – The key for the timing to track Deprecated since version 3.13.0.
-
timestamp
¶ Access the unix epoch timestamp value from the properties of the current message.
Return type: int