~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/app/longpoll/tests/test_longpoll.py

[r=abentley,
        rvb][no-qa] UpdatePreviewDiffJob now issues an ObjectModifiedEvent
        when it updates the preview_diff.

Show diffs side-by-side

added added

removed removed

Lines of Context:
45
45
 
46
46
class FakeEvent:
47
47
 
48
 
    adapts(IFakeObject, Interface)
 
48
    adapts(IFakeObject)
49
49
    implements(ILongPollEvent)
50
50
 
51
 
    def __init__(self, source, event):
 
51
    def __init__(self, source):
52
52
        self.source = source
53
 
        self.event = event
54
53
 
55
54
    @property
56
55
    def event_key(self):
57
 
        return "event-key-%s-%s" % (
58
 
            self.source.ident, self.event)
 
56
        return "event-key-%s" % self.source.ident
59
57
 
60
 
    def emit(self, data):
 
58
    def emit(self, **data):
61
59
        # Don't cargo-cult this; see .adapters.event.LongPollEvent instead.
62
60
        session = getUtility(IMessageSession)
63
61
        producer = session.getProducer(self.event_key)
69
67
    layer = LaunchpadFunctionalLayer
70
68
 
71
69
    def test_subscribe(self):
72
 
        # subscribe() gets the ILongPollEvent for the given (target, event)
73
 
        # and the ILongPollSubscriber for the given request (or the current
74
 
        # request is discovered). It subscribes the latter to the event, then
75
 
        # returns the event.
 
70
        # subscribe() gets the ILongPollEvent for the given target and the
 
71
        # ILongPollSubscriber for the given request (or the current request is
 
72
        # discovered). It subscribes the latter to the event, then returns the
 
73
        # event.
76
74
        session = getUtility(IMessageSession)
77
75
        request = LaunchpadTestRequest()
78
76
        an_object = FakeObject(12345)
79
77
        with ZopeAdapterFixture(FakeEvent):
80
 
            event = subscribe(an_object, "foo", request=request)
 
78
            event = subscribe(an_object, request=request)
81
79
        self.assertIsInstance(event, FakeEvent)
82
 
        self.assertEqual("event-key-12345-foo", event.event_key)
 
80
        self.assertEqual("event-key-12345", event.event_key)
83
81
        session.flush()
84
 
        # Emitting an event-key-12345-foo event will put something on the
 
82
        # Emitting an event-key-12345 event will put something on the
85
83
        # subscriber's queue.
86
84
        event_data = {"1234": 5678}
87
 
        event.emit(event_data)
 
85
        event.emit(**event_data)
88
86
        subscriber = ILongPollSubscriber(request)
89
87
        subscribe_queue = session.getConsumer(subscriber.subscribe_key)
90
88
        message = subscribe_queue.receive(timeout=5)
91
89
        self.assertEqual(event_data, message)
92
90
 
 
91
    def test_subscribe_to_named_event(self):
 
92
        # When an event_name is given to subscribe(), a named adapter is used
 
93
        # to get the ILongPollEvent for the given target.
 
94
        request = LaunchpadTestRequest()
 
95
        an_object = FakeObject(12345)
 
96
        with ZopeAdapterFixture(FakeEvent, name="foo"):
 
97
            event = subscribe(an_object, event_name="foo", request=request)
 
98
        self.assertIsInstance(event, FakeEvent)
 
99
 
93
100
    def test_emit(self):
94
 
        # subscribe() gets the ILongPollEvent for the given (target, event)
95
 
        # and passes the given data to its emit() method. It then returns the
96
 
        # event.
 
101
        # emit() gets the ILongPollEvent for the given target and passes the
 
102
        # given data to its emit() method. It then returns the event.
97
103
        an_object = FakeObject(12345)
98
104
        with ZopeAdapterFixture(FakeEvent):
99
 
            event = emit(an_object, "bar", {})
 
105
            event = emit(an_object)
100
106
            session = getUtility(IMessageSession)
101
107
            producer = session.getProducer(event.event_key)
102
108
            subscribe_queue = session.getConsumer("whatever")
104
110
            # Emit the event again; the subscribe queue was not associated
105
111
            # with the event before now.
106
112
            event_data = {"8765": 4321}
107
 
            event = emit(an_object, "bar", event_data)
 
113
            event = emit(an_object, **event_data)
108
114
        message = subscribe_queue.receive(timeout=5)
109
115
        self.assertEqual(event_data, message)
 
116
 
 
117
    def test_emit_named_event(self):
 
118
        # When an event_name is given to emit(), a named adapter is used to
 
119
        # get the ILongPollEvent for the given target.
 
120
        an_object = FakeObject(12345)
 
121
        with ZopeAdapterFixture(FakeEvent, name="foo"):
 
122
            event = emit(an_object, "foo")
 
123
        self.assertIsInstance(event, FakeEvent)