13
'MessagingUnavailable'
14
from zope.interface import Interface
17
class EmptyQueueException(Exception):
17
from zope.interface import (
23
class MessagingException(Exception):
24
"""Failure in messaging."""
27
class EmptyQueue(MessagingException):
18
28
"""Raised if there are no queued messages on a non-blocking read."""
31
class MessagingUnavailable(MessagingException):
32
"""Messaging systems are not available."""
35
class IMessageSession(Interface):
37
connection = Attribute("A connection to the messaging system.")
40
"""Connect to the messaging system.
42
If the session is already connected this should be a no-op.
46
"""Disconnect from the messaging system.
48
If the session is already disconnected this should be a no-op.
52
"""Run deferred tasks."""
55
"""Flush the session and reset."""
58
"""Reset the session."""
60
def defer(func, *args, **kwargs):
61
"""Schedule something to happen when this session is finished."""
63
def getProducer(name):
64
"""Get a `IMessageProducer` associated with this session."""
66
def getConsumer(name):
67
"""Get a `IMessageConsumer` associated with this session."""
22
70
class IMessageConsumer(Interface):
23
72
def receive(blocking=True):
24
73
"""Receive data from the queue.
26
:raises EmptyQueueException: If non-blocking and the queue is empty.
75
:raises EmptyQueue: If non-blocking and the queue is empty.
33
79
class IMessageProducer(Interface):
36
82
"""Serialize `data` into JSON and send it to the queue on commit."""
39
85
"""Serialize `data` into JSON and send it to the queue immediately."""
44
87
def associateConsumer(consumer):
88
"""Make the consumer receive messages from this producer on commit.
90
:param consumer: An `IMessageConsumer`
93
def associateConsumerNow(consumer):
45
94
"""Make the consumer receive messages from this producer.
47
96
:param consumer: An `IMessageConsumer`
50
def disassociateConsumer(consumer):
51
"""Make the consumer stop receiving messages from this producer.
53
:param consumer: An `IMessageConsumer`