PublishingConsumer

class rejected.consumer.PublishingConsumer(settings, process)[source]

The PublishingConsumer extends the Consumer class, adding two methods, one that allows for publishing of messages back on the same channel that the consumer is communicating on and another for replying to messages, adding RPC reply semantics to the outbound message.

In any of the consumer base classes, if the MESSAGE_TYPE attribute is set, the type property of incoming messages will be validated against when a message is received, checking for string equality against the MESSAGE_TYPE attribute. If they are not matched, the consumer will not process the message and will drop the message without an exception if the DROP_INVALID_MESSAGES attribute is set to True. If it is False, a ConsumerException is raised.

app_id

Access the current message’s app-id property as an attribute of the consumer class.

Return type:str
body

Access the opaque body from the current message.

Return type:str
configuration

Access the configuration stanza for the consumer as specified by the config section for the consumer in the rejected configuration.

Deprecated since version 3.1: Use settings instead.

Return type:dict
content_encoding

Access the current message’s content-encoding property as an attribute of the consumer class.

Return type:str
content_type

Access the current message’s content-type property as an attribute of the consumer class. :rtype: str

correlation_id

Access the current message’s correlation-id property as an attribute of the consumer class.

Return type:str
exchange

Access the exchange the message was published to as an attribute of the consumer class.

Return type:str
expiration

Access the current message’s expiration property as an attribute of the consumer class.

Return type:str
finish()

Finishes message processing for the current message.

headers

Access the current message’s headers property as an attribute of the consumer class.

Return type:dict
initialize()[source]

Extend this method for any initialization tasks that occur only when the Consumer class is created.

log_exception(msg_format, *args, **kwargs)

Customize the logging of uncaught exceptions.

Parameters:
  • msg_format (str) – format of msg to log with self.logger.error
  • args – positional arguments to pass to self.logger.error
  • kwargs – keyword args to pass into self.logger.error
  • send_to_sentry (bool) – if omitted or truthy, this keyword will send the captured exception to Sentry (if enabled).

By default, this method will log the message using logging.Logger.error() and send the exception to Sentry. If an exception is currently active, then the traceback will be logged at the debug level.

message_id

Access the current message’s message-id property as an attribute of the consumer class. :rtype: str

message_type

Access the current message’s type property as an attribute of the consumer class.

Return type:str
name

Property returning the name of the consumer class.

Return type:str
on_finish()

Called after the end of a request. Override this method to perform cleanup, logging, etc. This method is a counterpart to prepare. on_finish may not produce any output, as it is called after the response has been sent to the client.

prepare()

Called when a message is received before process.

Asynchronous support: Decorate this method with .gen.coroutine or .return_future to make it asynchronous (the asynchronous decorator cannot be used on prepare).

If this method returns a .Future execution will not proceed until the .Future is done.

priority

Access the current message’s priority property as an attribute of the consumer class.

Return type:int
process()

Extend this method for implementing your Consumer logic.

If the message can not be processed and the Consumer should stop after n failures to process messages, raise the ConsumerException.

Raises:ConsumerException
Raises:NotImplementedError
properties

Access the current message’s properties in dict form as an attribute of the consumer class.

Return type:dict
publish_message(exchange, routing_key, properties, body)[source]

Publish a message to RabbitMQ on the same channel the original message was received on.

Parameters:
  • exchange (str) – The exchange to publish to
  • routing_key (str) – The routing key to publish with
  • properties (dict) – The message properties
  • body (str) – The message body
redelivered

Indicates if the current message has been redelivered.

Return type:bool
reply(response_body, properties, auto_id=True, exchange=None, reply_to=None)[source]

Reply to the received message.

If auto_id is True, a new uuid4 value will be generated for the message_id and correlation_id will be set to the message_id of the original message. In addition, the timestamp will be assigned the current time of the message. If auto_id is False, neither the message_id or the correlation_id will be changed in the properties.

If exchange is not set, the exchange the message was received on will be used.

If reply_to is set in the original properties, it will be used as the routing key. If the reply_to is not set in the properties and it is not passed in, a ValueException will be raised. If reply to is set in the properties, it will be cleared out prior to the message being republished.

Parameters:
  • response_body (any) – The message body to send
  • properties (rejected.data.Properties) – Message properties to use
  • auto_id (bool) – Automatically shuffle message_id & correlation_id
  • exchange (str) – Override the exchange to publish to
  • reply_to (str) – Override the reply_to in the properties
Raises:

ValueError

reply_to

Access the current message’s reply-to property as an attribute of the consumer class.

Return type:str
require_setting(name, feature='this feature')

Raises an exception if the given app setting is not defined.

Parameters:
  • name (str) – The parameter name
  • feature (str) – A friendly name for the setting feature
routing_key

Access the routing key for the current message.

Return type:str
send_exception_to_sentry(exc_info)

Send an exception to Sentry if enabled.

Parameters:exc_info (tuple) – exception information as returned from sys.exc_info()
sentry_client

Access the Sentry raven Client instance or None

Use this object to add tags or additional context to Sentry error reports (see raven.base.Client.tags_context()) or to report messages (via raven.base.Client.captureMessage()) directly to Sentry.

Return type:raven.base.Client
set_sentry_context(tag, value)

Set a context tag in Sentry for the given key and value.

Parameters:
  • tag (str) – The context tag name
  • value (str) – The context value
settings

Access the consumer settings as specified by the config section for the consumer in the rejected configuration.

Return type:dict
shutdown()

Override to cleanly shutdown when rejected is stopping

statsd_add_timing(key, duration)

Add a timing to statsd

Parameters:
  • key (str) – The key to add the timing to
  • duration (int|float) – The timing value
statsd_incr(key, value=1)

Increment the specified key in statsd if statsd is enabled.

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by
statsd_track_duration(key)

Time around a context and emit a statsd metric.

Parameters:key (str) – The key for the timing to track
timestamp

Access the unix epoch timestamp value from the properties of the current message.

Return type:int
unset_sentry_context(tag)

Remove a context tag from sentry

Parameters:tag (str) – The context tag to remove
user_id

Access the user-id from the current message’s properties.

Return type:str
yield_to_ioloop()

Function that will allow Rejected to process IOLoop events while in a tight-loop inside an asynchronous consumer.