Consumer

class rejected.consumer.Consumer(settings, process, drop_invalid_messages=None, message_type=None, error_exchange=None, error_max_retry=None, drop_exchange=None)[source]

Base consumer class that defines the contract between rejected and consumer applications.

In any of the consumer base classes, if the message_type is specified in the configuration (or set with the MESSAGE_TYPE attribute), the type property of incoming messages will be validated against when a message is received. If there is no match, the consumer will not process the message and will drop the message without an exception if the drop_invalid_messages setting is set to True in the configuration (or if the DROP_INVALID_MESSAGES attribute is set to True). If it is False, a MessageException is raised.

If DROP_EXCHANGE is specified either as an attribute of the consumer class or in the consumer configuration, if a message is dropped, it is published to the that exchange prior to rejecting the message in RabbitMQ. When the message is republished, four new values are added to the AMQP headers message property: X-Dropped-By, X-Dropped-Reason, X-Dropped-Timestamp, X-Original-Exchange.

The X-Dropped-By header value contains the configured name of the consumer that dropped the message. X-Dropped-Reason contains the reason the message was dropped (eg invalid message type or maximum error count). X-Dropped-Timestamp value contains the ISO-8601 formatted timestamp of when the message was dropped. Finally, the X-Original-Exchange value contains the original exchange that the message was published to.

If a consumer raises a ProcessingException, the message that was being processed will be republished to the exchange specified by the error exchange configuration value or the ERROR_EXCHANGE attribute of the consumer’s class. The message will be published using the routing key that was last used for the message. The original message body and properties will be used and two additional header property values may be added:

  • X-Processing-Exception contains the string value of the
    exception that was raised, if specified.
  • X-Processing-Exceptions contains the quantity of processing
    exceptions that have been raised for the message.

In combination with a queue that has x-message-ttl set and x-dead-letter-exchange that points to the original exchange for the queue the consumer is consuming off of, you can implement a delayed retry cycle for messages that are failing to process due to external resource or service issues.

If error_max_retry is specified in the configuration or ERROR_MAX_RETRY is set on the class, the headers for each method will be inspected and if the value of X-Processing-Exceptions is greater than or equal to the specified value, the message will be dropped.

As of 3.18.6, the MESSAGE_AGE_KEY class level attribute contains the default key part to used when recording stats for the message age. You can also override the message_age_key() method to create compound keys. For example, to create a key that includes the message priority:

class Consumer(consumer.Consumer):

    def message_age_key(self):
        return 'priority-{}.message_age'.format(self.priority or 0)

Note

Since 3.17, Consumer and PublishingConsumer have been combined into the same class.

As of 3.19.13, the ACK_PROCESSING_EXCEPTIONS class level attribute allows you to ack messages that raise a ProcessingException instead of rejecting them, allowing for dead-lettered messages to be constrained to Defaults to `False.

app_id

Access the current message’s app-id property as an attribute of the consumer class.

Return type:str
body

Access the opaque body from the current message.

Return type:str
content_encoding

Access the current message’s content-encoding AMQP message property as an attribute of the consumer class.

Return type:str
content_type

Access the current message’s content-type AMQP message property as an attribute of the consumer class.

Return type:str
correlation_id

Access the current message’s correlation-id AMAP message property as an attribute of the consumer class. If the message does not have a correlation-id then, each message is assigned a new UUIDv4 based correlation-id value.

Return type:str
exchange

Access the AMQP exchange the message was published to as an attribute of the consumer class.

Return type:str
expiration

Access the current message’s expiration AMQP message property as an attribute of the consumer class.

Return type:str
finish()[source]

Finishes message processing for the current message. If this is called in prepare(), the process() method is not invoked for the current message.

headers

Access the current message’s headers AMQP message property as an attribute of the consumer class.

Return type:dict
initialize()[source]

Extend this method for any initialization tasks that occur only when the Consumer class is created.

io_loop

Access the tornado.ioloop.IOLoop instance for the current message.

New in version 3.18.4.

Return type:tornado.ioloop.IOLoop
message_age_key()[source]

Return the key part that is used in submitting message age stats. Override this method to change the key part. This could be used to include message priority in the key, for example.

New in version 3.18.6.

Return type:str
message_id

Access the current message’s message-id AMQP message property as an attribute of the consumer class.

Return type:str
message_type

Access the current message’s type AMQP message property as an attribute of the consumer class.

Return type:str
name

Property returning the name of the consumer class.

Return type:str
on_blocked(name)[source]

Called when a connection for this consumer is blocked.

Override this method to respond to being blocked.

New in version 3.17.

Parameters:name (str) – The connection name that is blocked
on_finish()[source]

Called after a message has been processed.

Override this method to perform cleanup, logging, etc. This method is a counterpart to prepare(). on_finish may not produce any output, as it is called after all processing has taken place.

If an exception is raised during the processing of a message, prepare() is not invoked.

Note

Asynchronous support: Decorate this method with tornado.gen.coroutine() to make it asynchronous.

on_unblocked(name)[source]

Called when a connection for this consumer is unblocked.

Override this method to respond to being blocked.

New in version 3.17.

Parameters:name (str) – The connection name that is blocked
prepare()[source]

Called when a message is received before process().

Note

Asynchronous support: Decorate this method with tornado.gen.coroutine() to make it asynchronous.

If this method returns a Future, execution will not proceed until the Future has completed.

priority

Access the current message’s priority AMQP message property as an attribute of the consumer class.

Return type:int
process()[source]

Extend this method for implementing your Consumer logic.

If the message can not be processed and the Consumer should stop after n failures to process messages, raise the ConsumerException.

Note

Asynchronous support: Decorate this method with tornado.gen.coroutine() to make it asynchronous.

Raises:rejected.consumer.ConsumerException
Raises:rejected.consumer.MessageException
Raises:rejected.consumer.ProcessingException
properties

Access the current message’s AMQP message properties in dict form as an attribute of the consumer class.

Return type:dict
publish_message(exchange, routing_key, properties, body, channel=None)[source]

Publish a message to RabbitMQ on the same channel the original message was received on.

Parameters:
  • exchange (str) – The exchange to publish to
  • routing_key (str) – The routing key to publish with
  • properties (dict) – The message properties
  • body (str) – The message body
  • channel (str) – The channel/connection name to use. If it is not specified, the channel that the message was delivered on is used.
redelivered

Indicates if the current message has been redelivered.

Return type:bool
reply(response_body, properties, auto_id=True, exchange=None, reply_to=None)[source]

Reply to the received message.

If auto_id is True, a new UUIDv4 value will be generated for the message_id AMQP message property. The correlation_id AMQP message property will be set to the message_id of the original message. In addition, the timestamp will be assigned the current time of the message. If auto_id is False, neither the message_id and the correlation_id AMQP properties will be changed in the properties.

If exchange is not set, the exchange the message was received on will be used.

If reply_to is set in the original properties, it will be used as the routing key. If the reply_to is not set in the properties and it is not passed in, a ValueError will be raised. If reply to is set in the properties, it will be cleared out prior to the message being republished.

Parameters:
  • response_body (any) – The message body to send
  • properties (rejected.data.Properties) – Message properties to use
  • auto_id (bool) – Automatically shuffle message_id & correlation_id
  • exchange (str) – Override the exchange to publish to
  • reply_to (str) – Override the reply_to AMQP property
Raises:

ValueError

reply_to

Access the current message’s reply-to AMQP message property as an attribute of the consumer class.

Return type:str
returned

Indicates if the message was delivered by consumer previously and returned from RabbitMQ.

New in version 3.17.

Return type:bool
routing_key

Access the routing key for the current message.

Return type:str
send_exception_to_sentry(exc_info)[source]

Send an exception to Sentry if enabled.

Parameters:exc_info (tuple) – exception information as returned from sys.exc_info()
sentry_client

Access the Sentry raven Client instance or None

Use this object to add tags or additional context to Sentry error reports (see raven.base.Client.tags_context()) or to report messages (via raven.base.Client.captureMessage()) directly to Sentry.

Return type:raven.base.Client
set_sentry_context(tag, value)[source]

Set a context tag in Sentry for the given key and value.

Parameters:
  • tag (str) – The context tag name
  • value (str) – The context value
settings

Access the consumer settings as specified by the config section for the consumer in the rejected configuration.

Return type:dict
shutdown()[source]

Override to cleanly shutdown when rejected is stopping the consumer.

This could be used for closing database connections or other such activities.

stats_add_duration(key, duration)[source]

Add a duration to the per-message measurements

New in version 3.19.0.

Parameters:
  • key (str) – The key to add the timing to
  • duration (int|float) – The timing value in seconds
stats_add_timing(key, duration)[source]

Add a timing to the per-message measurements

New in version 3.13.0.

Deprecated since version 3.19.0.

Parameters:
  • key (str) – The key to add the timing to
  • duration (int|float) – The timing value in seconds
stats_incr(key, value=1)[source]

Increment the specified key in the per-message measurements

New in version 3.13.0.

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by
stats_set_tag(key, value=1)[source]

Set the specified tag/value in the per-message measurements

New in version 3.13.0.

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by
stats_set_value(key, value=1)[source]

Set the specified key/value in the per-message measurements

New in version 3.13.0.

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by
stats_track_duration(key)[source]

Time around a context and add to the the per-message measurements

New in version 3.13.0.

Deprecated since version 3.19.0.

Parameters:key (str) – The key for the timing to track
statsd_add_timing(key, duration)[source]

Add a timing to the per-message measurements

Parameters:
  • key (str) – The key to add the timing to
  • duration (int|float) – The timing value in seconds

Deprecated since version 3.13.0.

statsd_incr(key, value=1)[source]

Increment the specified key in the per-message measurements

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by

Deprecated since version 3.13.0.

statsd_track_duration(key)[source]

Time around a context and add to the the per-message measurements

Parameters:key (str) – The key for the timing to track

Deprecated since version 3.13.0.

timestamp

Access the unix epoch timestamp value from the AMQP message properties of the current message.

Return type:int
unset_sentry_context(tag)[source]

Remove a context tag from sentry

Parameters:tag (str) – The context tag to remove
user_id

Access the user-id AMQP message property from the current message’s properties.

Return type:str
yield_to_ioloop()[source]

Function that will allow Rejected to process IOLoop events while in a tight-loop inside an asynchronous consumer.

class rejected.consumer.PublishingConsumer(*args, **kwargs)[source]

Deprecated, functionality moved to rejected.consumer.Consumer

Deprecated since version 3.17.0.

app_id

Access the current message’s app-id property as an attribute of the consumer class.

Return type:str
body

Access the opaque body from the current message.

Return type:str
content_encoding

Access the current message’s content-encoding AMQP message property as an attribute of the consumer class.

Return type:str
content_type

Access the current message’s content-type AMQP message property as an attribute of the consumer class.

Return type:str
correlation_id

Access the current message’s correlation-id AMAP message property as an attribute of the consumer class. If the message does not have a correlation-id then, each message is assigned a new UUIDv4 based correlation-id value.

Return type:str
exchange

Access the AMQP exchange the message was published to as an attribute of the consumer class.

Return type:str
expiration

Access the current message’s expiration AMQP message property as an attribute of the consumer class.

Return type:str
finish()

Finishes message processing for the current message. If this is called in prepare(), the process() method is not invoked for the current message.

headers

Access the current message’s headers AMQP message property as an attribute of the consumer class.

Return type:dict
initialize()

Extend this method for any initialization tasks that occur only when the Consumer class is created.

io_loop

Access the tornado.ioloop.IOLoop instance for the current message.

New in version 3.18.4.

Return type:tornado.ioloop.IOLoop
message_age_key()

Return the key part that is used in submitting message age stats. Override this method to change the key part. This could be used to include message priority in the key, for example.

New in version 3.18.6.

Return type:str
message_id

Access the current message’s message-id AMQP message property as an attribute of the consumer class.

Return type:str
message_type

Access the current message’s type AMQP message property as an attribute of the consumer class.

Return type:str
name

Property returning the name of the consumer class.

Return type:str
on_blocked(name)

Called when a connection for this consumer is blocked.

Override this method to respond to being blocked.

New in version 3.17.

Parameters:name (str) – The connection name that is blocked
on_finish()

Called after a message has been processed.

Override this method to perform cleanup, logging, etc. This method is a counterpart to prepare(). on_finish may not produce any output, as it is called after all processing has taken place.

If an exception is raised during the processing of a message, prepare() is not invoked.

Note

Asynchronous support: Decorate this method with tornado.gen.coroutine() to make it asynchronous.

on_unblocked(name)

Called when a connection for this consumer is unblocked.

Override this method to respond to being blocked.

New in version 3.17.

Parameters:name (str) – The connection name that is blocked
prepare()

Called when a message is received before process().

Note

Asynchronous support: Decorate this method with tornado.gen.coroutine() to make it asynchronous.

If this method returns a Future, execution will not proceed until the Future has completed.

priority

Access the current message’s priority AMQP message property as an attribute of the consumer class.

Return type:int
process()

Extend this method for implementing your Consumer logic.

If the message can not be processed and the Consumer should stop after n failures to process messages, raise the ConsumerException.

Note

Asynchronous support: Decorate this method with tornado.gen.coroutine() to make it asynchronous.

Raises:rejected.consumer.ConsumerException
Raises:rejected.consumer.MessageException
Raises:rejected.consumer.ProcessingException
properties

Access the current message’s AMQP message properties in dict form as an attribute of the consumer class.

Return type:dict
publish_message(exchange, routing_key, properties, body, channel=None)

Publish a message to RabbitMQ on the same channel the original message was received on.

Parameters:
  • exchange (str) – The exchange to publish to
  • routing_key (str) – The routing key to publish with
  • properties (dict) – The message properties
  • body (str) – The message body
  • channel (str) – The channel/connection name to use. If it is not specified, the channel that the message was delivered on is used.
redelivered

Indicates if the current message has been redelivered.

Return type:bool
reply(response_body, properties, auto_id=True, exchange=None, reply_to=None)

Reply to the received message.

If auto_id is True, a new UUIDv4 value will be generated for the message_id AMQP message property. The correlation_id AMQP message property will be set to the message_id of the original message. In addition, the timestamp will be assigned the current time of the message. If auto_id is False, neither the message_id and the correlation_id AMQP properties will be changed in the properties.

If exchange is not set, the exchange the message was received on will be used.

If reply_to is set in the original properties, it will be used as the routing key. If the reply_to is not set in the properties and it is not passed in, a ValueError will be raised. If reply to is set in the properties, it will be cleared out prior to the message being republished.

Parameters:
  • response_body (any) – The message body to send
  • properties (rejected.data.Properties) – Message properties to use
  • auto_id (bool) – Automatically shuffle message_id & correlation_id
  • exchange (str) – Override the exchange to publish to
  • reply_to (str) – Override the reply_to AMQP property
Raises:

ValueError

reply_to

Access the current message’s reply-to AMQP message property as an attribute of the consumer class.

Return type:str
returned

Indicates if the message was delivered by consumer previously and returned from RabbitMQ.

New in version 3.17.

Return type:bool
routing_key

Access the routing key for the current message.

Return type:str
send_exception_to_sentry(exc_info)

Send an exception to Sentry if enabled.

Parameters:exc_info (tuple) – exception information as returned from sys.exc_info()
sentry_client

Access the Sentry raven Client instance or None

Use this object to add tags or additional context to Sentry error reports (see raven.base.Client.tags_context()) or to report messages (via raven.base.Client.captureMessage()) directly to Sentry.

Return type:raven.base.Client
set_sentry_context(tag, value)

Set a context tag in Sentry for the given key and value.

Parameters:
  • tag (str) – The context tag name
  • value (str) – The context value
settings

Access the consumer settings as specified by the config section for the consumer in the rejected configuration.

Return type:dict
shutdown()

Override to cleanly shutdown when rejected is stopping the consumer.

This could be used for closing database connections or other such activities.

stats_add_duration(key, duration)

Add a duration to the per-message measurements

New in version 3.19.0.

Parameters:
  • key (str) – The key to add the timing to
  • duration (int|float) – The timing value in seconds
stats_add_timing(key, duration)

Add a timing to the per-message measurements

New in version 3.13.0.

Deprecated since version 3.19.0.

Parameters:
  • key (str) – The key to add the timing to
  • duration (int|float) – The timing value in seconds
stats_incr(key, value=1)

Increment the specified key in the per-message measurements

New in version 3.13.0.

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by
stats_set_tag(key, value=1)

Set the specified tag/value in the per-message measurements

New in version 3.13.0.

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by
stats_set_value(key, value=1)

Set the specified key/value in the per-message measurements

New in version 3.13.0.

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by
stats_track_duration(key)

Time around a context and add to the the per-message measurements

New in version 3.13.0.

Deprecated since version 3.19.0.

Parameters:key (str) – The key for the timing to track
statsd_add_timing(key, duration)

Add a timing to the per-message measurements

Parameters:
  • key (str) – The key to add the timing to
  • duration (int|float) – The timing value in seconds

Deprecated since version 3.13.0.

statsd_incr(key, value=1)

Increment the specified key in the per-message measurements

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by

Deprecated since version 3.13.0.

statsd_track_duration(key)

Time around a context and add to the the per-message measurements

Parameters:key (str) – The key for the timing to track

Deprecated since version 3.13.0.

timestamp

Access the unix epoch timestamp value from the AMQP message properties of the current message.

Return type:int
unset_sentry_context(tag)

Remove a context tag from sentry

Parameters:tag (str) – The context tag to remove
user_id

Access the user-id AMQP message property from the current message’s properties.

Return type:str
yield_to_ioloop()

Function that will allow Rejected to process IOLoop events while in a tight-loop inside an asynchronous consumer.