48
adapts(IFakeObject, Interface)
49
49
implements(ILongPollEvent)
51
def __init__(self, source, event):
51
def __init__(self, source):
52
52
self.source = source
56
55
def event_key(self):
57
return "event-key-%s-%s" % (
58
self.source.ident, self.event)
56
return "event-key-%s" % self.source.ident
58
def emit(self, **data):
61
59
# Don't cargo-cult this; see .adapters.event.LongPollEvent instead.
62
RabbitRoutingKey(self.event_key).send_now(data)
60
session = getUtility(IMessageSession)
61
producer = session.getProducer(self.event_key)
62
producer.sendNow(data)
65
65
class TestFunctions(TestCase):
67
67
layer = LaunchpadFunctionalLayer
69
69
def test_subscribe(self):
70
# subscribe() gets the ILongPollEvent for the given (target, event)
71
# and the ILongPollSubscriber for the given request (or the current
72
# request is discovered). It subscribes the latter to the event, then
70
# subscribe() gets the ILongPollEvent for the given target and the
71
# ILongPollSubscriber for the given request (or the current request is
72
# discovered). It subscribes the latter to the event, then returns the
74
session = getUtility(IMessageSession)
74
75
request = LaunchpadTestRequest()
75
76
an_object = FakeObject(12345)
76
77
with ZopeAdapterFixture(FakeEvent):
77
event = subscribe(an_object, "foo", request=request)
78
event = subscribe(an_object, request=request)
78
79
self.assertIsInstance(event, FakeEvent)
79
self.assertEqual("event-key-12345-foo", event.event_key)
80
# Emitting an event-key-12345-foo event will put something on the
80
self.assertEqual("event-key-12345", event.event_key)
82
# Emitting an event-key-12345 event will put something on the
81
83
# subscriber's queue.
82
84
event_data = {"1234": 5678}
83
event.emit(event_data)
85
event.emit(**event_data)
84
86
subscriber = ILongPollSubscriber(request)
85
subscribe_queue = RabbitQueue(subscriber.subscribe_key)
87
subscribe_queue = session.getConsumer(subscriber.subscribe_key)
86
88
message = subscribe_queue.receive(timeout=5)
87
89
self.assertEqual(event_data, message)
91
def test_subscribe_to_named_event(self):
92
# When an event_name is given to subscribe(), a named adapter is used
93
# to get the ILongPollEvent for the given target.
94
request = LaunchpadTestRequest()
95
an_object = FakeObject(12345)
96
with ZopeAdapterFixture(FakeEvent, name="foo"):
97
event = subscribe(an_object, event_name="foo", request=request)
98
self.assertIsInstance(event, FakeEvent)
89
100
def test_emit(self):
90
# subscribe() gets the ILongPollEvent for the given (target, event)
91
# and passes the given data to its emit() method. It then returns the
101
# emit() gets the ILongPollEvent for the given target and passes the
102
# given data to its emit() method. It then returns the event.
93
103
an_object = FakeObject(12345)
94
104
with ZopeAdapterFixture(FakeEvent):
95
event = emit(an_object, "bar", {})
96
routing_key = RabbitRoutingKey(event.event_key)
97
subscribe_queue = RabbitQueue("whatever")
98
routing_key.associateConsumer(subscribe_queue)
105
event = emit(an_object)
106
session = getUtility(IMessageSession)
107
producer = session.getProducer(event.event_key)
108
subscribe_queue = session.getConsumer("whatever")
109
producer.associateConsumerNow(subscribe_queue)
99
110
# Emit the event again; the subscribe queue was not associated
100
111
# with the event before now.
101
112
event_data = {"8765": 4321}
102
event = emit(an_object, "bar", event_data)
113
event = emit(an_object, **event_data)
103
114
message = subscribe_queue.receive(timeout=5)
104
115
self.assertEqual(event_data, message)
117
def test_emit_named_event(self):
118
# When an event_name is given to emit(), a named adapter is used to
119
# get the ILongPollEvent for the given target.
120
an_object = FakeObject(12345)
121
with ZopeAdapterFixture(FakeEvent, name="foo"):
122
event = emit(an_object, "foo")
123
self.assertIsInstance(event, FakeEvent)