~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/services/messaging/queue.py

  • Committer: Curtis Hovey
  • Date: 2011-08-18 20:56:37 UTC
  • mto: This revision was merged to the branch mainline in revision 13736.
  • Revision ID: curtis.hovey@canonical.com-20110818205637-ae0pf9aexdea2mlb
Cleaned up doctrings and hushed lint.

Show diffs side-by-side

added added

removed removed

Lines of Context:
5
5
 
6
6
__metaclass__ = type
7
7
__all__ = [
8
 
    "connect",
9
 
    "is_configured",
10
 
    "session",
11
 
    "unreliable_session",
 
8
    "RabbitRoutingKey",
 
9
    "RabbitQueue",
12
10
    ]
13
11
 
14
 
from collections import deque
15
 
from functools import partial
 
12
from amqplib import client_0_8 as amqp
16
13
import json
17
 
import sys
18
 
import threading
 
14
from threading import local as thread_local
19
15
import time
20
 
 
21
 
from amqplib import client_0_8 as amqp
22
16
import transaction
23
 
from transaction._transaction import Status as TransactionStatus
24
17
from zope.interface import implements
25
18
 
26
19
from canonical.config import config
27
20
from lp.services.messaging.interfaces import (
28
21
    IMessageConsumer,
29
22
    IMessageProducer,
30
 
    IMessageSession,
31
 
    MessagingUnavailable,
32
 
    QueueEmpty,
33
 
    QueueNotFound,
 
23
    EmptyQueueException,
34
24
    )
35
25
 
36
 
 
37
26
LAUNCHPAD_EXCHANGE = "launchpad-exchange"
38
27
 
39
28
 
40
 
class RabbitSessionTransactionSync:
41
 
 
42
 
    implements(transaction.interfaces.ISynchronizer)
43
 
 
44
 
    def __init__(self, session):
45
 
        self.session = session
46
 
 
47
 
    def newTransaction(self, txn):
48
 
        pass
49
 
 
50
 
    def beforeCompletion(self, txn):
51
 
        pass
52
 
 
53
 
    def afterCompletion(self, txn):
54
 
        if txn.status == TransactionStatus.COMMITTED:
55
 
            self.session.finish()
56
 
        else:
57
 
            self.session.reset()
58
 
 
59
 
 
60
 
def is_configured():
61
 
    """Return True if rabbit looks to be configured."""
62
 
    return not (
63
 
        config.rabbitmq.host is None or
64
 
        config.rabbitmq.userid is None or
65
 
        config.rabbitmq.password is None or
66
 
        config.rabbitmq.virtual_host is None)
67
 
 
68
 
 
69
 
def connect():
70
 
    """Connect to AMQP if possible.
71
 
 
72
 
    :raises MessagingUnavailable: If the configuration is incomplete.
73
 
    """
74
 
    if not is_configured():
75
 
        raise MessagingUnavailable("Incomplete configuration")
76
 
    return amqp.Connection(
77
 
        host=config.rabbitmq.host, userid=config.rabbitmq.userid,
78
 
        password=config.rabbitmq.password,
79
 
        virtual_host=config.rabbitmq.virtual_host, insist=False)
80
 
 
81
 
 
82
 
class RabbitSession(threading.local):
83
 
 
84
 
    implements(IMessageSession)
85
 
 
86
 
    exchange = LAUNCHPAD_EXCHANGE
87
 
 
88
 
    def __init__(self):
89
 
        super(RabbitSession, self).__init__()
90
 
        self._connection = None
91
 
        self._deferred = deque()
92
 
        # Maintain sessions according to transaction boundaries. Keep a strong
93
 
        # reference to the sync because the transaction manager does not. We
94
 
        # need one per thread (definining it here is enough to ensure that).
95
 
        self._sync = RabbitSessionTransactionSync(self)
96
 
        transaction.manager.registerSynch(self._sync)
97
 
 
98
 
    @property
99
 
    def is_connected(self):
100
 
        """See `IMessageSession`."""
101
 
        return (
102
 
            self._connection is not None and
103
 
            self._connection.transport is not None)
104
 
 
105
 
    def connect(self):
106
 
        """See `IMessageSession`.
107
 
 
108
 
        Open a connection for this thread if necessary. Connections cannot be
109
 
        shared between threads.
110
 
        """
111
 
        if self._connection is None or self._connection.transport is None:
112
 
            self._connection = connect()
113
 
        return self._connection
114
 
 
115
 
    def disconnect(self):
116
 
        """See `IMessageSession`."""
117
 
        if self._connection is not None:
118
 
            try:
119
 
                self._connection.close()
120
 
            finally:
121
 
                self._connection = None
122
 
 
123
 
    def flush(self):
124
 
        """See `IMessageSession`."""
125
 
        tasks = self._deferred
126
 
        while len(tasks) != 0:
127
 
            tasks.popleft()()
128
 
 
129
 
    def finish(self):
130
 
        """See `IMessageSession`."""
131
 
        try:
132
 
            self.flush()
133
 
        finally:
134
 
            self.reset()
135
 
 
136
 
    def reset(self):
137
 
        """See `IMessageSession`."""
138
 
        self._deferred.clear()
139
 
        self.disconnect()
140
 
 
141
 
    def defer(self, func, *args, **kwargs):
142
 
        """See `IMessageSession`."""
143
 
        self._deferred.append(partial(func, *args, **kwargs))
144
 
 
145
 
    def getProducer(self, name):
146
 
        """See `IMessageSession`."""
147
 
        return RabbitRoutingKey(self, name)
148
 
 
149
 
    def getConsumer(self, name):
150
 
        """See `IMessageSession`."""
151
 
        return RabbitQueue(self, name)
152
 
 
153
 
 
154
 
# Per-thread sessions.
155
 
session = RabbitSession()
156
 
session_finish_handler = (
157
 
    lambda event: session.finish())
158
 
 
159
 
 
160
 
class RabbitUnreliableSession(RabbitSession):
161
 
    """An "unreliable" `RabbitSession`.
162
 
 
163
 
    Unreliable in this case means that certain errors in deferred tasks are
164
 
    silently suppressed. This means that services can continue to function
165
 
    even in the absence of a running and fully functional message queue.
166
 
 
167
 
    Other types of errors are also caught because we don't want this
168
 
    subsystem to destabilise other parts of Launchpad but we nonetheless
169
 
    record OOPses for these.
170
 
 
171
 
    XXX: We only suppress MessagingUnavailable for now because we want to
172
 
    monitor this closely before we add more exceptions to the
173
 
    suppressed_errors list. Potential candidates are `MessagingException`,
174
 
    `IOError` or `amqp.AMQPException`.
175
 
    """
176
 
 
177
 
    suppressed_errors = (
178
 
        MessagingUnavailable,
179
 
        )
180
 
 
181
 
    def finish(self):
182
 
        """See `IMessageSession`.
183
 
 
184
 
        Suppresses errors listed in `suppressed_errors`. Also suppresses
185
 
        other errors but files an oops report for these.
186
 
        """
187
 
        try:
188
 
            super(RabbitUnreliableSession, self).finish()
189
 
        except self.suppressed_errors:
190
 
            pass
191
 
        except Exception:
192
 
            from canonical.launchpad.webapp import errorlog
193
 
            errorlog.globalErrorUtility.raising(sys.exc_info())
194
 
 
195
 
 
196
 
# Per-thread "unreliable" sessions.
197
 
unreliable_session = RabbitUnreliableSession()
198
 
unreliable_session_finish_handler = (
199
 
    lambda event: unreliable_session.finish())
 
29
class MessagingDataManager:
 
30
    """A Zope transaction data manager for Launchpad messaging.
 
31
 
 
32
    This class implements the necessary code to send messages only when
 
33
    the Zope transactions are committed.  It will iterate over the messages
 
34
    and send them using queue.send(oncommit=False).
 
35
    """
 
36
    def __init__(self, messages):
 
37
        self.messages = messages
 
38
 
 
39
    def _cleanup(self):
 
40
        """Completely remove the list of stored messages"""
 
41
        del self.messages[:]
 
42
 
 
43
    def abort(self, txn):
 
44
        self._cleanup()
 
45
 
 
46
    def tpc_begin(self, txn):
 
47
        pass
 
48
 
 
49
    def tpc_vote(self, txn):
 
50
        pass
 
51
 
 
52
    def tpc_finish(self, txn):
 
53
        self._cleanup()
 
54
 
 
55
    def tpc_abort(self, txn):
 
56
        self._cleanup()
 
57
 
 
58
    def sortKey(self):
 
59
        """Ensure that messages are sent after PostgresSQL connections
 
60
        are committed."""
 
61
        return "zz_messaging_%s" % id(self)
 
62
 
 
63
    def commit(self, txn):
 
64
        for send_func, data in self.messages:
 
65
            send_func(data)
 
66
        self._cleanup()
200
67
 
201
68
 
202
69
class RabbitMessageBase:
203
70
    """Base class for all RabbitMQ messaging."""
204
71
 
205
 
    def __init__(self, session):
206
 
        self.session = IMessageSession(session)
207
 
        self._channel = None
208
 
 
209
 
    @property
210
 
    def channel(self):
211
 
        if self._channel is None or not self._channel.is_open:
212
 
            connection = self.session.connect()
213
 
            self._channel = connection.channel()
214
 
            self._channel.exchange_declare(
215
 
                self.session.exchange, "direct", durable=False,
216
 
                auto_delete=False, nowait=False)
217
 
        return self._channel
 
72
    class_locals = thread_local()
 
73
 
 
74
    channel = None
 
75
 
 
76
    def _initialize(self):
 
77
        # Open a connection and channel for this thread if necessary.
 
78
        # Connections cannot be shared between threads.
 
79
        if not hasattr(self.class_locals, "rabbit_connection"):
 
80
            conn = amqp.Connection(
 
81
                host=config.rabbitmq.host, userid=config.rabbitmq.userid,
 
82
                password=config.rabbitmq.password,
 
83
                virtual_host=config.rabbitmq.virtual_host, insist=False)
 
84
            self.class_locals.rabbit_connection = conn
 
85
 
 
86
            # Initialize storage for oncommit messages.
 
87
            self.class_locals.messages = []
 
88
 
 
89
        conn = self.class_locals.rabbit_connection
 
90
        self.channel = conn.channel()
 
91
        #self.channel.access_request(
 
92
        #    '/data', active=True, write=True, read=True)
 
93
        self.channel.exchange_declare(
 
94
            LAUNCHPAD_EXCHANGE, "direct", durable=False,
 
95
            auto_delete=False, nowait=False)
 
96
 
 
97
    def close(self):
 
98
        # Note the connection is not closed - it is shared with other
 
99
        # queues. Just close our channel.
 
100
        if self.channel:
 
101
            self.channel.close()
 
102
 
 
103
    def _disconnect(self):
 
104
        """Disconnect from rabbit. The connection is shared, so this will
 
105
        break other RabbitQueue instances."""
 
106
        self.close()
 
107
        if hasattr(self.class_locals, 'rabbit_connection'):
 
108
            self.class_locals.rabbit_connection.close()
 
109
            del self.class_locals.rabbit_connection
218
110
 
219
111
 
220
112
class RabbitRoutingKey(RabbitMessageBase):
222
114
 
223
115
    implements(IMessageProducer)
224
116
 
225
 
    def __init__(self, session, routing_key):
226
 
        super(RabbitRoutingKey, self).__init__(session)
 
117
    def __init__(self, routing_key):
227
118
        self.key = routing_key
228
119
 
229
120
    def associateConsumer(self, consumer):
230
121
        """Only receive messages for requested routing key."""
231
 
        self.session.defer(self.associateConsumerNow, consumer)
232
 
 
233
 
    def associateConsumerNow(self, consumer):
234
 
        """Only receive messages for requested routing key."""
235
 
        # The queue will be auto-deleted 5 minutes after its last use.
236
 
        # http://www.rabbitmq.com/extensions.html#queue-leases
237
 
        self.channel.queue_declare(
238
 
            consumer.name, nowait=False, auto_delete=False,
239
 
            arguments={"x-expires": 300000})  # 5 minutes.
 
122
        self._initialize()
240
123
        self.channel.queue_bind(
241
 
            queue=consumer.name, exchange=self.session.exchange,
 
124
            queue=consumer.name, exchange=LAUNCHPAD_EXCHANGE,
 
125
            routing_key=self.key, nowait=False)
 
126
 
 
127
    def disassociateConsumer(self, consumer):
 
128
        """Stop receiving messages for the requested routing key."""
 
129
        self._initialize()
 
130
        self.channel.queue_unbind(
 
131
            queue=consumer.name, exchange=LAUNCHPAD_EXCHANGE,
242
132
            routing_key=self.key, nowait=False)
243
133
 
244
134
    def send(self, data):
245
135
        """See `IMessageProducer`."""
246
 
        self.session.defer(self.sendNow, data)
 
136
        self._initialize()
 
137
        messages = self.class_locals.messages
 
138
        # XXX: The data manager should close channels and flush too
 
139
        if not messages:
 
140
            transaction.get().join(MessagingDataManager(messages))
 
141
        messages.append((self.send_now, data))
247
142
 
248
 
    def sendNow(self, data):
 
143
    def send_now(self, data):
249
144
        """Immediately send a message to the broker."""
 
145
        self._initialize()
250
146
        json_data = json.dumps(data)
251
147
        msg = amqp.Message(json_data)
252
148
        self.channel.basic_publish(
253
 
            exchange=self.session.exchange,
254
 
            routing_key=self.key, msg=msg)
 
149
            exchange=LAUNCHPAD_EXCHANGE, routing_key=self.key, msg=msg)
255
150
 
256
151
 
257
152
class RabbitQueue(RabbitMessageBase):
259
154
 
260
155
    implements(IMessageConsumer)
261
156
 
262
 
    def __init__(self, session, name):
263
 
        super(RabbitQueue, self).__init__(session)
 
157
    def __init__(self, name):
264
158
        self.name = name
 
159
        self._initialize()
 
160
        self.channel.queue_declare(self.name, nowait=False)
265
161
 
266
162
    def receive(self, timeout=0.0):
267
163
        """Pull a message from the queue.
268
164
 
269
165
        :param timeout: Wait a maximum of `timeout` seconds before giving up,
270
 
            trying at least once.
271
 
        :raises QueueEmpty: if the timeout passes.
 
166
            trying at least once.  If timeout is None, block forever.
 
167
        :raises: EmptyQueueException if the timeout passes.
272
168
        """
273
 
        endtime = time.time() + timeout
 
169
        starttime = time.time()
274
170
        while True:
275
 
            try:
276
 
                message = self.channel.basic_get(self.name)
277
 
                if message is None:
278
 
                    if time.time() > endtime:
279
 
                        raise QueueEmpty()
280
 
                    time.sleep(0.1)
281
 
                else:
282
 
                    self.channel.basic_ack(message.delivery_tag)
283
 
                    return json.loads(message.body)
284
 
            except amqp.AMQPChannelException, error:
285
 
                if error.amqp_reply_code == 404:
286
 
                    raise QueueNotFound()
287
 
                else:
288
 
                    raise
 
171
            message = self.channel.basic_get(self.name)
 
172
            if message is None:
 
173
                if time.time() > (starttime + timeout):
 
174
                    raise EmptyQueueException
 
175
                time.sleep(0.1)
 
176
            else:
 
177
                data = json.loads(message.body)
 
178
                self.channel.basic_ack(message.delivery_tag)
 
179
                return data
 
180
 
 
181
        # XXX The code below will be useful when we can implement this
 
182
        # properly.
 
183
        result = []
 
184
        def callback(msg):
 
185
            result.append(json.loads(msg.body))
 
186
 
 
187
        self.channel.basic_consume(self.name, callback=callback)
 
188
        self.channel.wait()
 
189
        return result[0]
 
190