~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/app/longpoll/adapters/event.py

  • Committer: Launchpad Patch Queue Manager
  • Date: 2011-07-05 17:43:37 UTC
  • mfrom: (13333.6.49 lp-app-longpoll)
  • Revision ID: launchpad@pqm.canonical.com-20110705174337-4ygi4i0soakb5cs2
[r=gmb][no-qa] New lp.app.longpoll package.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2011 Canonical Ltd.  This software is licensed under the
 
2
# GNU Affero General Public License version 3 (see the file LICENSE).
 
3
 
 
4
"""Long poll adapters."""
 
5
 
 
6
__metaclass__ = type
 
7
__all__ = [
 
8
    "generate_event_key",
 
9
    "LongPollEvent",
 
10
    ]
 
11
 
 
12
from lp.services.messaging.queue import RabbitRoutingKey
 
13
 
 
14
 
 
15
def generate_event_key(*components):
 
16
    """Generate a suitable event name."""
 
17
    if len(components) == 0:
 
18
        raise AssertionError(
 
19
            "Event keys must contain at least one component.")
 
20
    return "longpoll.event.%s" % ".".join(
 
21
        str(component) for component in components)
 
22
 
 
23
 
 
24
class LongPollEvent:
 
25
    """Base-class for event adapters.
 
26
 
 
27
    Sub-classes need to declare something along the lines of:
 
28
 
 
29
        adapts(Interface, Interface)
 
30
        implements(ILongPollEvent)
 
31
 
 
32
    """
 
33
 
 
34
    def __init__(self, source, event):
 
35
        self.source = source
 
36
        self.event = event
 
37
 
 
38
    @property
 
39
    def event_key(self):
 
40
        """See `ILongPollEvent`."""
 
41
        raise NotImplementedError(self.__class__.event_key)
 
42
 
 
43
    def emit(self, data):
 
44
        """See `ILongPollEvent`."""
 
45
        payload = {"event_key": self.event_key, "event_data": data}
 
46
        routing_key = RabbitRoutingKey(self.event_key)
 
47
        routing_key.send(payload)