~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/app/longpoll/tests/test_longpoll.py

  • Committer: Stuart Bishop
  • Date: 2011-09-28 12:49:24 UTC
  • mfrom: (9893.10.1 trivial)
  • mto: This revision was merged to the branch mainline in revision 14178.
  • Revision ID: stuart.bishop@canonical.com-20110928124924-m5a22fymqghw6c5i
Merged trivial into distinct-db-users.

Show diffs side-by-side

added added

removed removed

Lines of Context:
5
5
 
6
6
__metaclass__ = type
7
7
 
8
 
from zope.component import adapts
 
8
from zope.component import (
 
9
    adapts,
 
10
    getUtility,
 
11
    )
9
12
from zope.interface import (
10
13
    Attribute,
11
14
    implements,
22
25
    ILongPollEvent,
23
26
    ILongPollSubscriber,
24
27
    )
25
 
from lp.services.messaging.queue import (
26
 
    RabbitQueue,
27
 
    RabbitRoutingKey,
28
 
    )
 
28
from lp.services.messaging.interfaces import IMessageSession
29
29
from lp.testing import TestCase
30
30
from lp.testing.fixture import ZopeAdapterFixture
31
31
 
45
45
 
46
46
class FakeEvent:
47
47
 
48
 
    adapts(IFakeObject, Interface)
 
48
    adapts(IFakeObject)
49
49
    implements(ILongPollEvent)
50
50
 
51
 
    def __init__(self, source, event):
 
51
    def __init__(self, source):
52
52
        self.source = source
53
 
        self.event = event
54
53
 
55
54
    @property
56
55
    def event_key(self):
57
 
        return "event-key-%s-%s" % (
58
 
            self.source.ident, self.event)
 
56
        return "event-key-%s" % self.source.ident
59
57
 
60
 
    def emit(self, data):
 
58
    def emit(self, **data):
61
59
        # Don't cargo-cult this; see .adapters.event.LongPollEvent instead.
62
 
        RabbitRoutingKey(self.event_key).send_now(data)
 
60
        session = getUtility(IMessageSession)
 
61
        producer = session.getProducer(self.event_key)
 
62
        producer.sendNow(data)
63
63
 
64
64
 
65
65
class TestFunctions(TestCase):
67
67
    layer = LaunchpadFunctionalLayer
68
68
 
69
69
    def test_subscribe(self):
70
 
        # subscribe() gets the ILongPollEvent for the given (target, event)
71
 
        # and the ILongPollSubscriber for the given request (or the current
72
 
        # request is discovered). It subscribes the latter to the event, then
73
 
        # returns the event.
 
70
        # subscribe() gets the ILongPollEvent for the given target and the
 
71
        # ILongPollSubscriber for the given request (or the current request is
 
72
        # discovered). It subscribes the latter to the event, then returns the
 
73
        # event.
 
74
        session = getUtility(IMessageSession)
74
75
        request = LaunchpadTestRequest()
75
76
        an_object = FakeObject(12345)
76
77
        with ZopeAdapterFixture(FakeEvent):
77
 
            event = subscribe(an_object, "foo", request=request)
 
78
            event = subscribe(an_object, request=request)
78
79
        self.assertIsInstance(event, FakeEvent)
79
 
        self.assertEqual("event-key-12345-foo", event.event_key)
80
 
        # Emitting an event-key-12345-foo event will put something on the
 
80
        self.assertEqual("event-key-12345", event.event_key)
 
81
        session.flush()
 
82
        # Emitting an event-key-12345 event will put something on the
81
83
        # subscriber's queue.
82
84
        event_data = {"1234": 5678}
83
 
        event.emit(event_data)
 
85
        event.emit(**event_data)
84
86
        subscriber = ILongPollSubscriber(request)
85
 
        subscribe_queue = RabbitQueue(subscriber.subscribe_key)
 
87
        subscribe_queue = session.getConsumer(subscriber.subscribe_key)
86
88
        message = subscribe_queue.receive(timeout=5)
87
89
        self.assertEqual(event_data, message)
88
90
 
 
91
    def test_subscribe_to_named_event(self):
 
92
        # When an event_name is given to subscribe(), a named adapter is used
 
93
        # to get the ILongPollEvent for the given target.
 
94
        request = LaunchpadTestRequest()
 
95
        an_object = FakeObject(12345)
 
96
        with ZopeAdapterFixture(FakeEvent, name="foo"):
 
97
            event = subscribe(an_object, event_name="foo", request=request)
 
98
        self.assertIsInstance(event, FakeEvent)
 
99
 
89
100
    def test_emit(self):
90
 
        # subscribe() gets the ILongPollEvent for the given (target, event)
91
 
        # and passes the given data to its emit() method. It then returns the
92
 
        # event.
 
101
        # emit() gets the ILongPollEvent for the given target and passes the
 
102
        # given data to its emit() method. It then returns the event.
93
103
        an_object = FakeObject(12345)
94
104
        with ZopeAdapterFixture(FakeEvent):
95
 
            event = emit(an_object, "bar", {})
96
 
            routing_key = RabbitRoutingKey(event.event_key)
97
 
            subscribe_queue = RabbitQueue("whatever")
98
 
            routing_key.associateConsumer(subscribe_queue)
 
105
            event = emit(an_object)
 
106
            session = getUtility(IMessageSession)
 
107
            producer = session.getProducer(event.event_key)
 
108
            subscribe_queue = session.getConsumer("whatever")
 
109
            producer.associateConsumerNow(subscribe_queue)
99
110
            # Emit the event again; the subscribe queue was not associated
100
111
            # with the event before now.
101
112
            event_data = {"8765": 4321}
102
 
            event = emit(an_object, "bar", event_data)
 
113
            event = emit(an_object, **event_data)
103
114
        message = subscribe_queue.receive(timeout=5)
104
115
        self.assertEqual(event_data, message)
 
116
 
 
117
    def test_emit_named_event(self):
 
118
        # When an event_name is given to emit(), a named adapter is used to
 
119
        # get the ILongPollEvent for the given target.
 
120
        an_object = FakeObject(12345)
 
121
        with ZopeAdapterFixture(FakeEvent, name="foo"):
 
122
            event = emit(an_object, "foo")
 
123
        self.assertIsInstance(event, FakeEvent)