Multi-threaded Use NotesΒΆ
To ensure that the network communication module at the core of rabbitpy is
thread safe, the rabbitpy.io.IO
class is a daemonic Python thread
that uses a combination of threading.Event
, Queue.Queue
,
and a local cross-platform implementation of a read-write socket pair in
rabbitpy.IO.write_trigger
.
While ensuring that the core socket IO and dispatching of AMQP frames across threads goes a long way to make sure that multi-threaded applications can safely use rabbitpy, it does not protect against cross-thread channel utilization.
Due to the way that channels events are managed, it is recommend that you restrict the use of a channel to an individual thread. By not sharing channels across threads, you will ensure that you do not accidentally create issues with channel state in the AMQP protocol. As an asynchronous RPC style protocol, when you issue commands, such as a queue declaration, or are publishing a message, there are expectations in the conversation on a channel about the order of events and frames sent and received.
The following example uses the main Python thread to connect to RabbitMQ and then spawns a thread for publishing and a thread for consuming.
import rabbitpy
import threading
EXCHANGE = 'threading_example'
QUEUE = 'threading_queue'
ROUTING_KEY = 'test'
MESSAGE_COUNT = 100
def consumer(connection):
"""Consume MESSAGE_COUNT messages on the connection and then exit.
:param rabbitpy.Connection connection: The connection to consume on
"""
received = 0
with connection.channel() as channel:
for message in rabbitpy.Queue(channel, QUEUE).consume_messages():
print message.body
message.ack()
received += 1
if received == MESSAGE_COUNT:
break
def publisher(connection):
"""Pubilsh up to MESSAGE_COUNT messages on connection
on an individual thread.
:param rabbitpy.Connection connection: The connection to publish on
"""
with connection.channel() as channel:
for index in range(0, MESSAGE_COUNT):
message = rabbitpy.Message(channel, 'Message #%i' % index)
message.publish(EXCHANGE, ROUTING_KEY)
# Connect to RabbitMQ
with rabbitpy.Connection() as connection:
# Open the channel, declare and bind the exchange and queue
with connection.channel() as channel:
# Declare the exchange
exchange = rabbitpy.Exchange(channel, EXCHANGE)
exchange.declare()
# Declare the queue
queue = rabbitpy.Queue(channel, QUEUE)
queue.declare()
# Bind the queue to the exchange
queue.bind(EXCHANGE, ROUTING_KEY)
# Pass in the kwargs
kwargs = {'connection': connection}
# Start the consumer thread
consumer_thread = threading.Thread(target=consumer, kwargs=kwargs)
consumer_thread.start()
# Start the pubisher thread
publisher_thread = threading.Thread(target=publisher, kwargs=kwargs)
publisher_thread.start()
# Join the consumer thread, waiting for it to consume all MESSAGE_COUNT messages
consumer_thread.join()