~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# Copyright 2011 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""An API for messaging systems in Launchpad, e.g. RabbitMQ."""

__metaclass__ = type
__all__ = [
    "RabbitRoutingKey",
    "RabbitQueue",
    ]

from amqplib import client_0_8 as amqp
import json
from threading import local as thread_local
import time
import transaction
from zope.interface import implements

from canonical.config import config
from lp.services.messaging.interfaces import (
    IMessageConsumer,
    IMessageProducer,
    EmptyQueueException,
    )

LAUNCHPAD_EXCHANGE = "launchpad-exchange"


class MessagingDataManager:
    """A Zope transaction data manager for Launchpad messaging.

    This class implements the necessary code to send messages only when
    the Zope transactions are committed.  It will iterate over the messages
    and send them using queue.send(oncommit=False).
    """
    def __init__(self, messages):
        self.messages = messages

    def _cleanup(self):
        """Completely remove the list of stored messages"""
        del self.messages[:]

    def abort(self, txn):
        self._cleanup()

    def tpc_begin(self, txn):
        pass

    def tpc_vote(self, txn):
        pass

    def tpc_finish(self, txn):
        self._cleanup()

    def tpc_abort(self, txn):
        self._cleanup()

    def sortKey(self):
        """Ensure that messages are sent after PostgresSQL connections
        are committed."""
        return "zz_messaging_%s" % id(self)

    def commit(self, txn):
        for send_func, data in self.messages:
            send_func(data)
        self._cleanup()


class RabbitMessageBase:
    """Base class for all RabbitMQ messaging."""

    class_locals = thread_local()

    channel = None

    def _initialize(self):
        # Open a connection and channel for this thread if necessary.
        # Connections cannot be shared between threads.
        if not hasattr(self.class_locals, "rabbit_connection"):
            conn = amqp.Connection(
                host=config.rabbitmq.host, userid=config.rabbitmq.userid,
                password=config.rabbitmq.password,
                virtual_host=config.rabbitmq.virtual_host, insist=False)
            self.class_locals.rabbit_connection = conn

            # Initialize storage for oncommit messages.
            self.class_locals.messages = []

        conn = self.class_locals.rabbit_connection
        self.channel = conn.channel()
        #self.channel.access_request(
        #    '/data', active=True, write=True, read=True)
        self.channel.exchange_declare(
            LAUNCHPAD_EXCHANGE, "direct", durable=False,
            auto_delete=False, nowait=False)

    def close(self):
        # Note the connection is not closed - it is shared with other
        # queues. Just close our channel.
        if self.channel:
            self.channel.close()

    def _disconnect(self):
        """Disconnect from rabbit. The connection is shared, so this will
        break other RabbitQueue instances."""
        self.close()
        if hasattr(self.class_locals, 'rabbit_connection'):
            self.class_locals.rabbit_connection.close()
            del self.class_locals.rabbit_connection


class RabbitRoutingKey(RabbitMessageBase):
    """A RabbitMQ data origination point."""

    implements(IMessageProducer)

    def __init__(self, routing_key):
        self.key = routing_key

    def associateConsumer(self, consumer):
        """Only receive messages for requested routing key."""
        self._initialize()
        self.channel.queue_bind(
            queue=consumer.name, exchange=LAUNCHPAD_EXCHANGE,
            routing_key=self.key, nowait=False)

    def disassociateConsumer(self, consumer):
        """Stop receiving messages for the requested routing key."""
        self._initialize()
        self.channel.queue_unbind(
            queue=consumer.name, exchange=LAUNCHPAD_EXCHANGE,
            routing_key=self.key, nowait=False)

    def send(self, data):
        """See `IMessageProducer`."""
        self._initialize()
        messages = self.class_locals.messages
        # XXX: The data manager should close channels and flush too
        if not messages:
            transaction.get().join(MessagingDataManager(messages))
        messages.append((self.send_now, data))

    def send_now(self, data):
        """Immediately send a message to the broker."""
        self._initialize()
        json_data = json.dumps(data)
        msg = amqp.Message(json_data)
        self.channel.basic_publish(
            exchange=LAUNCHPAD_EXCHANGE, routing_key=self.key, msg=msg)


class RabbitQueue(RabbitMessageBase):
    """A RabbitMQ Queue."""

    implements(IMessageConsumer)

    def __init__(self, name):
        self.name = name
        self._initialize()
        self.channel.queue_declare(self.name, nowait=False)

    def receive(self, timeout=0.0):
        """Pull a message from the queue.

        :param timeout: Wait a maximum of `timeout` seconds before giving up,
            trying at least once.  If timeout is None, block forever.
        :raises: EmptyQueueException if the timeout passes.
        """
        starttime = time.time()
        while True:
            message = self.channel.basic_get(self.name)
            if message is None:
                if time.time() > (starttime + timeout):
                    raise EmptyQueueException
                time.sleep(0.1)
            else:
                data = json.loads(message.body)
                self.channel.basic_ack(message.delivery_tag)
                return data

        # XXX The code below will be useful when we can implement this
        # properly.
        result = []
        def callback(msg):
            result.append(json.loads(msg.body))

        self.channel.basic_consume(self.name, callback=callback)
        self.channel.wait()
        return result[0]