~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/services/messaging/utility.py

first incantation of a messaging singleton

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2011 Canonical Ltd.  This software is licensed under the
 
2
# GNU Affero General Public License version 3 (see the file LICENSE).
 
3
 
 
4
"""An API for messaging systems in Launchpad, e.g. RabbitMQ."""
 
5
 
 
6
__metaclass__ = type
 
7
__all__ = []
 
8
 
 
9
 
 
10
from amqplib import client_0_8 as amqp
 
11
import transaction
 
12
from threading import local as thread_local
 
13
 
 
14
from canonical.config import config
 
15
 
 
16
LAUNCHPAD_EXCHANGE = "launchpad-exchange"
 
17
 
 
18
 
 
19
class MessagingDataManager:
 
20
    def __init__(self, messaging_utility):
 
21
        self.utility = messaging_utility
 
22
 
 
23
    def _cleanup(self):
 
24
        del self.utility.messages
 
25
 
 
26
    def abort(self, txn):
 
27
        pass
 
28
 
 
29
    def tpc_begin(self, txn):
 
30
        pass
 
31
 
 
32
    def tpc_vote(self, txn):
 
33
        pass
 
34
 
 
35
    def tpc_finish(self, txn):
 
36
        self._cleanup()
 
37
 
 
38
    def tpc_abort(self, txn):
 
39
        self._cleanup()
 
40
 
 
41
    def sortKey(self):
 
42
        return None
 
43
 
 
44
    def commit(self, txn):
 
45
        for message in self.utility.locals.messages:
 
46
            self.utility.send_now(**message)
 
47
        self._cleanup()
 
48
 
 
49
 
 
50
class Messaging:
 
51
 
 
52
    locals = thread_local()
 
53
 
 
54
    def initialize(self):
 
55
        if not hasattr(self.locals, "rabbit"):
 
56
            conn = amqp.Connection(
 
57
                host=config.rabbitmq.host, userid=config.rabbitmq.userid,
 
58
                password=config.rabbitmq.password,
 
59
                virtual_host=config.rabbitmq.virtual_host, insist=False)
 
60
            self.locals.rabbit = conn
 
61
            ch = conn.channel()
 
62
            ch.exchange_declare(
 
63
                LAUNCHPAD_EXCHANGE, "direct", durable=False,
 
64
                auto_delete=False)
 
65
 
 
66
    def send(self, routing_key, json_data=None, pickle=None, oncommit=True):
 
67
        """XXX"""
 
68
        if pickle is not None:
 
69
            raise AssertionError("pickle param not implemented yet")
 
70
 
 
71
        self.initalize()
 
72
        msg = amqp.Message(json_data)
 
73
 
 
74
        if not oncommit:
 
75
            self.send_now(routing_key, json_data)
 
76
            return
 
77
 
 
78
        if not hasattr(self.locals, "messages"):
 
79
            self.locals.messages = []
 
80
            txn = transaction.get()
 
81
            txn.join(MessagingDataManager())
 
82
 
 
83
        self.locals.messages.append(
 
84
            dict(routing_key=routing_key, json_data=json_data))
 
85
 
 
86
 
 
87
    def send_now(self, routing_key, json_data=None, pickle=None):
 
88
        channel = self.locals.rabbit.channel()
 
89
        channel.basic_publish(
 
90
            exchange=LAUNCHPAD_EXCHANGE,
 
91
            routing_key=routing_key,
 
92
            msg=amqp.Message(json_data)
 
93
            )
 
94
 
 
95
 
 
96
messaging = Messaging()