51
from canonical.config import config
52
from canonical.launchpad import webapp
53
from canonical.launchpad.webapp.errorlog import ErrorReportEvent
54
from lp.services.messaging.interfaces import MessagingUnavailable
55
from lp.services.messaging.rabbit import connect
56
from lp.services.timeline.requesttimeline import get_request_timeline
59
class PGBouncerFixture(pgbouncer.fixture.PGBouncerFixture):
60
"""Inserts a controllable pgbouncer instance in front of PostgreSQL.
62
The pgbouncer proxy can be shutdown and restarted at will, simulating
63
database outages and fastdowntime deployments.
31
class RabbitServer(rabbitfixture.server.RabbitServer):
32
"""A RabbitMQ server fixture with Launchpad-specific config.
34
:ivar service_config: A snippet of .ini that describes the `rabbitmq`
67
super(PGBouncerFixture, self).__init__()
70
from canonical.testing.layers import DatabaseLayer
72
DatabaseLayer._db_fixture.dbname,
73
DatabaseLayer._db_template_fixture.dbname,
77
for dbname in dbnames:
78
self.databases[dbname] = 'dbname=%s port=5432 host=localhost' % (
81
# Known users, pulled from security.cfg
82
security_cfg_path = os.path.join(
83
config.root, 'database', 'schema', 'security.cfg')
84
security_cfg_config = SafeConfigParser({})
85
security_cfg_config.read([security_cfg_path])
86
for section_name in security_cfg_config.sections():
87
self.users[section_name] = 'trusted'
88
self.users[section_name + '_ro'] = 'trusted'
89
self.users[os.environ['USER']] = 'trusted'
90
self.users['pgbouncer'] = 'trusted'
92
# Administrative access is useful for debugging.
93
self.admin_users = ['launchpad', 'pgbouncer', os.environ['USER']]
96
super(PGBouncerFixture, self).setUp()
98
# reconnect_store cleanup added first so it is run last, after
99
# the environment variables have been reset.
100
self.addCleanup(self._maybe_reconnect_stores)
102
# Abuse the PGPORT environment variable to get things connecting
103
# via pgbouncer. Otherwise, we would need to temporarily
104
# overwrite the database connection strings in the config.
105
self.useFixture(EnvironmentVariableFixture('PGPORT', str(self.port)))
107
# Reset database connections so they go through pgbouncer.
108
self._maybe_reconnect_stores()
110
def _maybe_reconnect_stores(self):
111
"""Force Storm Stores to reconnect if they are registered.
113
This is a noop if the Component Architecture is not loaded,
114
as we are using a test layer that doesn't provide database
117
from canonical.testing.layers import (
121
if is_ca_available():
39
super(RabbitServer, self).setUp()
40
self.config.service_config = dedent("""\
46
""" % self.config.port)
125
49
class ZopeAdapterFixture(Fixture):
190
114
# can add more flexibility then.
191
115
defineChecker(self.replacement, self.checker)
194
undefineChecker, self.replacement)
196
self.gsm.adapters.register,
197
(self.context_interface, self.request_interface),
199
self.name, self.original)
202
class ZopeUtilityFixture(Fixture):
203
"""A fixture that temporarily registers a different utility."""
205
def __init__(self, component, intf, name):
206
"""Construct a new fixture.
208
:param component: An instance of a class that provides this
210
:param intf: The Zope interface class to register, eg
212
:param name: A string name to match.
214
self.component = component
219
super(ZopeUtilityFixture, self).setUp()
220
gsm = getGlobalSiteManager()
221
gsm.registerUtility(self.component, self.intf, self.name)
223
gsm.unregisterUtility,
224
self.component, self.intf, self.name)
227
class Urllib2Fixture(Fixture):
228
"""Let tests use urllib to connect to an in-process Launchpad.
230
Initially this only supports connecting to launchpad.dev because
231
that is all that is needed. Later work could connect all
232
sub-hosts (e.g. bugs.launchpad.dev)."""
235
# Work around circular import.
236
from canonical.testing.layers import wsgi_application
237
super(Urllib2Fixture, self).setUp()
238
add_wsgi_intercept('launchpad.dev', 80, lambda: wsgi_application)
239
self.addCleanup(remove_wsgi_intercept, 'launchpad.dev', 80)
241
self.addCleanup(uninstall_opener)
244
class CaptureOops(Fixture):
245
"""Capture OOPSes notified via zope event notification.
247
:ivar oopses: A list of the oops objects raised while the fixture is
249
:ivar oops_ids: A set of observed oops ids. Used to de-dup reports
253
AMQP_SENTINEL = "STOP NOW"
256
super(CaptureOops, self).setUp()
258
self.oops_ids = set()
259
self.useFixture(ZopeEventHandlerFixture(self._recordOops))
261
self.connection = connect()
262
except MessagingUnavailable:
265
self.addCleanup(self.connection.close)
266
self.channel = self.connection.channel()
267
self.addCleanup(self.channel.close)
268
self.oops_config = oops.Config()
269
self.oops_config.publishers.append(self._add_oops)
272
def setUpQueue(self):
273
"""Sets up the queue to be used to receive reports.
275
The queue is autodelete which means we can only use it once: after
276
that it will be automatically nuked and must be recreated.
278
self.queue_name, _, _ = self.channel.queue_declare(
279
durable=True, auto_delete=True)
280
# In production the exchange already exists and is durable, but
281
# here we make it just-in-time, and tell it to go when the test
283
self.channel.exchange_declare(config.error_reports.error_exchange,
284
"fanout", durable=True, auto_delete=True)
285
self.channel.queue_bind(
286
self.queue_name, config.error_reports.error_exchange)
288
def _add_oops(self, report):
289
"""Add an oops if it isn't already recorded.
291
This is called from both amqp and in-appserver situations.
293
if report['id'] not in self.oops_ids:
294
self.oopses.append(report)
295
self.oops_ids.add(report['id'])
297
@adapter(ErrorReportEvent)
298
def _recordOops(self, event):
299
"""Callback from zope publishing to publish oopses."""
300
self._add_oops(event.object)
303
"""Sync the in-memory list of OOPS with the external OOPS source."""
306
# Send ourselves a message: when we receive this, we've processed all
307
# oopses created before sync() was invoked.
308
message = amqp.Message(self.AMQP_SENTINEL)
309
# Match what oops publishing does
310
message.properties["delivery_mode"] = 2
311
# Publish the message via a new channel (otherwise rabbit
312
# shortcircuits it straight back to us, apparently).
313
connection = connect()
315
channel = connection.channel()
317
channel.basic_publish(
318
message, config.error_reports.error_exchange,
319
config.error_reports.error_queue_key)
324
receiver = oops_amqp.Receiver(
325
self.oops_config, connect, self.queue_name)
326
receiver.sentinel = self.AMQP_SENTINEL
328
receiver.run_forever()
330
# Ensure we leave the queue ready to roll, or later calls to
335
class CaptureTimeline(Fixture):
336
"""Record and return the timeline.
338
This won't work well (yet) for code that starts new requests as they will
344
webapp.adapter.set_request_started(time.time())
345
self.timeline = get_request_timeline(
346
get_current_browser_request())
347
self.addCleanup(webapp.adapter.clear_request_started)
350
class DemoMode(Fixture):
351
"""Run with an is_demo configuration.
353
This changes the page styling, feature flag permissions, and perhaps
359
config.push('demo-fixture', '''
362
site_message = This is a demo site mmk. \
363
<a href="http://example.com">File a bug</a>.
365
self.addCleanup(lambda: config.pop('demo-fixture'))
118
super(ZopeViewReplacementFixture, self).tearDown()
119
undefineChecker(self.replacement)
120
self.gsm.adapters.register(
121
(self.context_interface, self.request_interface), Interface,
122
self.name, self.original)