~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# Copyright 2011 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Messaging interfaces."""

__metaclass__ = type
__all__ = [
    'IMessageConsumer',
    'IMessageProducer',
    'IMessageSession',
    'MessagingException',
    'MessagingUnavailable',
    'QueueEmpty',
    'QueueNotFound',
    ]


from zope.interface import Interface
from zope.schema import Bool


class MessagingException(Exception):
    """Failure in messaging."""


class MessagingUnavailable(MessagingException):
    """Messaging systems are not available."""


class QueueNotFound(MessagingException):
    """Raised if the queue was not found."""


class QueueEmpty(MessagingException):
    """Raised if there are no queued messages on a non-blocking read."""


class IMessageSession(Interface):

    is_connected = Bool(
        u"Whether the session is connected to the messaging system.")

    def connect():
        """Connect to the messaging system.

        If the session is already connected this should be a no-op.
        """

    def disconnect():
        """Disconnect from the messaging system.

        If the session is already disconnected this should be a no-op.
        """

    def flush():
        """Run deferred tasks."""

    def finish():
        """Flush the session and reset."""

    def reset():
        """Reset the session."""

    def defer(func, *args, **kwargs):
        """Schedule something to happen when this session is finished."""

    def getProducer(name):
        """Get a `IMessageProducer` associated with this session."""

    def getConsumer(name):
        """Get a `IMessageConsumer` associated with this session."""


class IMessageConsumer(Interface):

    def receive(blocking=True):
        """Receive data from the queue.

        :raises EmptyQueue: If non-blocking and the queue is empty.
        """


class IMessageProducer(Interface):

    def send(data):
        """Serialize `data` into JSON and send it to the queue on commit."""

    def sendNow(data):
        """Serialize `data` into JSON and send it to the queue immediately."""

    def associateConsumer(consumer):
        """Make the consumer receive messages from this producer on commit.

        :param consumer: An `IMessageConsumer`
        """

    def associateConsumerNow(consumer):
        """Make the consumer receive messages from this producer.

        :param consumer: An `IMessageConsumer`
        """