48
adapts(IFakeObject, Interface)
49
49
implements(ILongPollEvent)
51
def __init__(self, source, event):
51
def __init__(self, source):
52
52
self.source = source
56
55
def event_key(self):
57
return "event-key-%s-%s" % (
58
self.source.ident, self.event)
56
return "event-key-%s" % self.source.ident
58
def emit(self, **data):
61
59
# Don't cargo-cult this; see .adapters.event.LongPollEvent instead.
62
60
session = getUtility(IMessageSession)
63
61
producer = session.getProducer(self.event_key)
69
67
layer = LaunchpadFunctionalLayer
71
69
def test_subscribe(self):
72
# subscribe() gets the ILongPollEvent for the given (target, event)
73
# and the ILongPollSubscriber for the given request (or the current
74
# request is discovered). It subscribes the latter to the event, then
70
# subscribe() gets the ILongPollEvent for the given target and the
71
# ILongPollSubscriber for the given request (or the current request is
72
# discovered). It subscribes the latter to the event, then returns the
76
74
session = getUtility(IMessageSession)
77
75
request = LaunchpadTestRequest()
78
76
an_object = FakeObject(12345)
79
77
with ZopeAdapterFixture(FakeEvent):
80
event = subscribe(an_object, "foo", request=request)
78
event = subscribe(an_object, request=request)
81
79
self.assertIsInstance(event, FakeEvent)
82
self.assertEqual("event-key-12345-foo", event.event_key)
80
self.assertEqual("event-key-12345", event.event_key)
84
# Emitting an event-key-12345-foo event will put something on the
82
# Emitting an event-key-12345 event will put something on the
85
83
# subscriber's queue.
86
84
event_data = {"1234": 5678}
87
event.emit(event_data)
85
event.emit(**event_data)
88
86
subscriber = ILongPollSubscriber(request)
89
87
subscribe_queue = session.getConsumer(subscriber.subscribe_key)
90
88
message = subscribe_queue.receive(timeout=5)
91
89
self.assertEqual(event_data, message)
91
def test_subscribe_to_named_event(self):
92
# When an event_name is given to subscribe(), a named adapter is used
93
# to get the ILongPollEvent for the given target.
94
request = LaunchpadTestRequest()
95
an_object = FakeObject(12345)
96
with ZopeAdapterFixture(FakeEvent, name="foo"):
97
event = subscribe(an_object, event_name="foo", request=request)
98
self.assertIsInstance(event, FakeEvent)
93
100
def test_emit(self):
94
# subscribe() gets the ILongPollEvent for the given (target, event)
95
# and passes the given data to its emit() method. It then returns the
101
# emit() gets the ILongPollEvent for the given target and passes the
102
# given data to its emit() method. It then returns the event.
97
103
an_object = FakeObject(12345)
98
104
with ZopeAdapterFixture(FakeEvent):
99
event = emit(an_object, "bar", {})
105
event = emit(an_object)
100
106
session = getUtility(IMessageSession)
101
107
producer = session.getProducer(event.event_key)
102
108
subscribe_queue = session.getConsumer("whatever")
104
110
# Emit the event again; the subscribe queue was not associated
105
111
# with the event before now.
106
112
event_data = {"8765": 4321}
107
event = emit(an_object, "bar", event_data)
113
event = emit(an_object, **event_data)
108
114
message = subscribe_queue.receive(timeout=5)
109
115
self.assertEqual(event_data, message)
117
def test_emit_named_event(self):
118
# When an event_name is given to emit(), a named adapter is used to
119
# get the ILongPollEvent for the given target.
120
an_object = FakeObject(12345)
121
with ZopeAdapterFixture(FakeEvent, name="foo"):
122
event = emit(an_object, "foo")
123
self.assertIsInstance(event, FakeEvent)