1
# Copyright 2011 Canonical Ltd. This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
4
"""An API for messaging systems in Launchpad, e.g. RabbitMQ."""
10
from amqplib import client_0_8 as amqp
12
from threading import local as thread_local
14
from canonical.config import config
16
LAUNCHPAD_EXCHANGE = "launchpad-exchange"
19
class MessagingDataManager:
20
def __init__(self, messaging_utility):
21
self.utility = messaging_utility
24
del self.utility.messages
29
def tpc_begin(self, txn):
32
def tpc_vote(self, txn):
35
def tpc_finish(self, txn):
38
def tpc_abort(self, txn):
44
def commit(self, txn):
45
for message in self.utility.locals.messages:
46
self.utility.send_now(**message)
52
locals = thread_local()
55
if not hasattr(self.locals, "rabbit"):
56
conn = amqp.Connection(
57
host=config.rabbitmq.host, userid=config.rabbitmq.userid,
58
password=config.rabbitmq.password,
59
virtual_host=config.rabbitmq.virtual_host, insist=False)
60
self.locals.rabbit = conn
63
LAUNCHPAD_EXCHANGE, "direct", durable=False,
66
def send(self, routing_key, json_data=None, pickle=None, oncommit=True):
68
if pickle is not None:
69
raise AssertionError("pickle param not implemented yet")
72
msg = amqp.Message(json_data)
75
self.send_now(routing_key, json_data)
78
if not hasattr(self.locals, "messages"):
79
self.locals.messages = []
80
txn = transaction.get()
81
txn.join(MessagingDataManager())
83
self.locals.messages.append(
84
dict(routing_key=routing_key, json_data=json_data))
87
def send_now(self, routing_key, json_data=None, pickle=None):
88
channel = self.locals.rabbit.channel()
89
channel.basic_publish(
90
exchange=LAUNCHPAD_EXCHANGE,
91
routing_key=routing_key,
92
msg=amqp.Message(json_data)
96
messaging = Messaging()