~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# Copyright 2011 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Long poll adapters."""

__metaclass__ = type
__all__ = [
    "generate_subscribe_key",
    "LongPollApplicationRequestSubscriber",
    ]

from uuid import uuid4

from lazr.restful.interfaces import IJSONRequestCache
from zope.component import (
    adapts,
    getUtility,
    )
from zope.interface import implements
from zope.publisher.interfaces import IApplicationRequest

from canonical.config import config
from lp.services.longpoll.interfaces import ILongPollSubscriber
from lp.services.messaging.interfaces import IMessageSession


def generate_subscribe_key():
    """Generate a suitable new, unique, subscribe key."""
    return "longpoll.subscribe.%s" % uuid4()


class LongPollApplicationRequestSubscriber:

    adapts(IApplicationRequest)
    implements(ILongPollSubscriber)

    def __init__(self, request):
        self.request = request

    @property
    def subscribe_key(self):
        objects = IJSONRequestCache(self.request).objects
        if "longpoll" in objects:
            return objects["longpoll"]["key"]
        return None

    def subscribe(self, event):
        cache = IJSONRequestCache(self.request)
        if "longpoll" not in cache.objects:
            cache.objects["longpoll"] = {
                "uri": config.txlongpoll.uri,
                "key": generate_subscribe_key(),
                "subscriptions": [],
                }
        session = getUtility(IMessageSession)
        subscribe_queue = session.getConsumer(self.subscribe_key)
        producer = session.getProducer(event.event_key)
        producer.associateConsumer(subscribe_queue)
        cache.objects["longpoll"]["subscriptions"].append(event.event_key)