The lp.services.database.adapter module provides a Storm store that can prevent SQL statements from being executed if a request takes longer than a configured length of time to execute. It uses lp.services.config for configuration parameters. Imports and test setup: >>> import threading >>> import transaction >>> from zope.component import getUtility >>> from lazr.restful.utils import get_current_browser_request >>> from storm.zope.interfaces import IZStorm >>> from lp.services.webapp.interfaces import ( ... IStoreSelector, MAIN_STORE, MASTER_FLAVOR) >>> from lp.services.config import config >>> import lp.services.webapp.adapter >>> from lp.services.webapp.adapter import ( ... clear_request_started, get_request_statements, ... set_request_started) >>> from lp.testing.layers import DatabaseLayer >>> from lp.services.timeline.requesttimeline import get_request_timeline There are several possible database connections available via the IStoreSelector utility. >>> store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR) >>> dbname = DatabaseLayer._db_fixture.dbname >>> active_name = store.execute("SELECT current_database()").get_one()[0] >>> if active_name != dbname: print '%s != %s' % (active_name, dbname) >>> active_name == dbname True Statement Logging ================= While a request is in progress, the Launchpad database adapter keeps a log of the statements executed. First show that the statement log is not maintained outside of a request: >>> get_request_statements() [] >>> store.execute('SELECT 1', noresult=True) >>> store.execute('SELECT 2', noresult=True) >>> get_request_statements() [] Now begin a request, and issue a number of statements. We include statements with bind variables to ensure these are logged correctly, especially for the case of LIKE which uses % characters. >>> set_request_started() >>> store.execute('SELECT 1', noresult=True) >>> store.execute('SELECT 2', noresult=True) >>> store.execute('SELECT * FROM person where name = ?', ... [u'fred'], noresult=True) >>> store.execute("SELECT * FROM person where name = '%s foo'", ... noresult=True) >>> store.execute('SELECT * FROM person where id = ?', ... [2], noresult=True) >>> store.execute("SELECT * FROM person where name like '%%' || ?", ... [u'fred'], noresult=True) >>> store.execute("SELECT * FROM person where name like '%d foo'||'%%'", ... noresult=True) >>> for _, _, _, statement, _ in get_request_statements(): ... print statement SELECT 1 SELECT 2 SELECT * FROM person where name = u'fred' SELECT * FROM person where name = '%s foo' SELECT * FROM person where id = 2 SELECT * FROM person where name like '%%' || u'fred' SELECT * FROM person where name like '%d foo'||'%%' A timeline is created too: >>> timeline = get_request_timeline(get_current_browser_request()) >>> for action in timeline.actions: ... if not action.category.startswith("SQL-"): ... continue ... print action.detail SELECT 1 SELECT 2 SELECT * FROM person where name = u'fred' SELECT * FROM person where name = '%s foo' SELECT * FROM person where id = 2 SELECT * FROM person where name like '%%' || u'fred' SELECT * FROM person where name like '%d foo'||'%%' After we complete the request, the statement log is cleared: >>> clear_request_started() >>> get_request_statements() [] By default, all statements of a request are recorded in a Python list. We can optionally specify another container where the statements are stored. If we use a LimitedList, we can limit the number of recorded statements. >>> from lp.services.limitedlist import LimitedList >>> set_request_started(request_statements=LimitedList(2)) >>> store.execute('SELECT 1', noresult=True) >>> store.execute('SELECT 2', noresult=True) >>> store.execute('SELECT 3', noresult=True) >>> for _, _, _, statement, _ in get_request_statements(): ... print statement SELECT 2 SELECT 3 >>> clear_request_started() transaction.commit() and transaction.abort() calls are logged too, if we pass the transaction manager to set_request_started(). Note that aborted transactions are still in the status "Active". >>> set_request_started(txn=transaction.manager) >>> store.execute('SELECT 1', noresult=True) >>> store.execute('SELECT 2', noresult=True) >>> transaction.commit() >>> store.execute('SELECT 3', noresult=True) >>> transaction.abort() >>> for _, _, _, statement, _ in get_request_statements(): ... print statement SELECT 1 SELECT 2 Transaction completed, status: Committed SELECT 3 Transaction completed, status: Active >>> clear_request_started() While you're not meant to call clear_request_started() without having a request in progress, some exception handlers do. We raise a warning and let it pass. >>> import warnings >>> with warnings.catch_warnings(record=True) as no_request_warning: ... clear_request_started() >>> print no_request_warning[0].message clear_request_started() called outside of a request Statement Timeout ================= The timeout is set in launchpad.conf file. By default it is unset, which corresponds to no timeout: >>> print config.database.db_statement_timeout None Connections created with the database adapter will use this timeout as the Postgres statement timeout (a value of zero means no timeout): >>> def current_statement_timeout(store): ... result = store.execute('SHOW statement_timeout') ... timeout = result.get_one()[0] ... # convert to miliseconds and round value to nearest 10ms ... if timeout == '0': ... timeout = 0 ... elif timeout.endswith('ms'): ... timeout = int(timeout[:-2]) ... elif timeout.endswith('s'): ... timeout = int(timeout[:-1]) * 1000 ... else: ... raise ValueError('Unknown timeout value: %s' % timeout) ... return '%dms' % round(timeout, -1) ... >>> def reset_store(): ... global store ... zstorm = getUtility(IZStorm) ... zstorm.remove(store) ... transaction.abort() ... store.close() ... store = getUtility(IStoreSelector).get( ... MAIN_STORE, MASTER_FLAVOR) >>> set_request_started() >>> print current_statement_timeout(store) 0ms >>> clear_request_started() Using the builtin pg_sleep() function, we can trigger the timeout by sleeping for 200ms with a 100ms statement timeout: >>> from textwrap import dedent >>> test_data = dedent(""" ... [database] ... db_statement_timeout: 100 ... """) >>> config.push('base_test_data', test_data) >>> reset_store() >>> set_request_started() >>> print current_statement_timeout(store) 100ms >>> store.execute('SELECT pg_sleep(0.200)', noresult=True) Traceback (most recent call last): ... LaunchpadTimeoutError: Statement: 'SELECT pg_sleep(0.200)' Parameters:() Original error: QueryCanceledError('canceling statement due to statement timeout\n',) Even though the statement timed out, it is recorded in the statement log: >>> print get_request_statements()[-1][3] SELECT pg_sleep(0.200) >>> clear_request_started() It is possible to disable timeouts. They should be disabled if set_request_started() is called in scripts. >>> reset_store() >>> set_request_started(enable_timeout=False) >>> print current_statement_timeout(store) 0ms >>> store.execute('SELECT pg_sleep(0.200)', noresult=True) >>> clear_request_started() The statement_timeout will be adjusted during the transaction, within a certain precision. To test this reliably, we need a time machine to avoid random failures under load. Lets build one and plug it in: >>> _now = 0 >>> def fake_time(): ... return float(_now) >>> def time_travel(delta): ... global _now ... _now += delta >>> lp.services.webapp.adapter.time = fake_time # Monkey patch Now issue three statements, the first one taking less than the precision time but the second going over the threshold. We use the time machine to fake how long things take. >>> test_data = dedent(""" ... [database] ... db_statement_timeout: 10000 ... db_statement_timeout_precision: 1000 ... """) >>> config.push('test', test_data) >>> reset_store() >>> set_request_started() >>> store.execute('SELECT TRUE', noresult=True) >>> print current_statement_timeout(store) 10000ms >>> time_travel(0.5) # Forward in time 0.5 seconds >>> store.execute('SELECT TRUE', noresult=True) >>> print current_statement_timeout(store) 10000ms >>> time_travel(0.6) # Forward in time 0.6 seconds, now over precision This invokation, the PostgreSQL statement timeout will be updated before issuing the SQL command as we have exceeded the precision period: >>> store.execute('SELECT TRUE', noresult=True) >>> print current_statement_timeout(store) 8900ms >>> time_travel(8.89) # 0.01s remaining before hard timeout This final invokation, we will actually sleep to ensure that the timeout being reported by PostgreSQL is actually working: >>> store.execute('SELECT pg_sleep(0.2)', noresult=True) Traceback (most recent call last): ... LaunchpadTimeoutError: Statement: 'SELECT pg_sleep(0.2)' Parameters:() Original error: QueryCanceledError('canceling statement due to statement timeout\n',) >>> clear_request_started() Set the timeout to 5000ms for the next tests: >>> test_data = dedent(""" ... [database] ... db_statement_timeout: 5000 ... """) >>> config.push('test', test_data) >>> reset_store() >>> set_request_started() >>> print current_statement_timeout(store) 5000ms >>> clear_request_started() Turn off the timemachine so nobody hurts themselves: >>> import time >>> lp.services.webapp.adapter.time = time.time Request Timeout =============== While the postgres statement timeout can help cut short some out of control requests, it will not help when a request performs a large number of small requests. To help with this, the set_request_started() and clear_request_started() functions are provided as hooks for the web publisher. If a request exceeds the timeout, execute() method on cursors will start raising an exception. Signal the start of a request: >>> set_request_started() Perform an operation before the time limit expires: >>> store.execute('SELECT 1', noresult=True) Once the request has been completed, clear_request_started() should be called: >>> clear_request_started() The request start time can also be passed to set_request_started(). Set the request start time to 1 minute in the past, and execute another query: >>> set_request_started(time.time() - 60) >>> store.execute('SELECT 2', noresult=True) Traceback (most recent call last): ... RequestExpired: request expired. The statement about to be executed is not recorded in the statement log. The request time limit was exceeded before the statement was issued to the database. >>> get_request_statements() [(0, 0, 'SQL-main-master', 'SELECT 2', ...)] When a RequestExpired exception is raised, the current transaction will be doomed: >>> clear_request_started() >>> transaction.get().isDoomed() True >>> transaction.commit() Traceback (most recent call last): ... DoomedTransaction Cleanup: >>> transaction.abort() The Launchpad database adapter assumes that each thread services a different request, so a request timing out on one thread will not affect other threads: >>> started_request = threading.Event() >>> statement_issued = threading.Event() >>> def foo(): ... set_request_started(time.time() - 60) # timed out ... started_request.set() ... statement_issued.wait() ... >>> set_request_started() >>> thread = threading.Thread(target=foo) >>> thread.start() >>> started_request.wait() >>> store.execute('SELECT 1', noresult=True) >>> statement_issued.set() >>> thread.join() >>> clear_request_started() Similarly, starting a new request in another thread will not reset the remove the timout: >>> started_request = threading.Event() >>> statement_issued = threading.Event() >>> def bar(): ... set_request_started() ... started_request.set() ... statement_issued.wait() ... >>> set_request_started(time.time() - 60) >>> thread = threading.Thread(target=bar) >>> thread.start() >>> started_request.wait() >>> store.execute('SELECT 1', noresult=True) Traceback (most recent call last): ... RequestExpired: request expired. >>> statement_issued.set() >>> thread.join() >>> clear_request_started() >>> transaction.abort() If no timeout has been set, then requests will not time out: >>> test_data = dedent(""" ... [database] ... db_statement_timeout: none ... """) >>> config.push('test', test_data) >>> reset_store() >>> set_request_started(time.time() - 60) >>> store.execute('SELECT 1', noresult=True) >>> clear_request_started() Switching Database Users ======================== The Launchpad store uses lp.services.config to configure its connection. This can be adjusted by choosing a different database config section. By default we connect as "launchpad" >>> print store.execute("select current_user").get_one()[0] launchpad_main >>> from lp.services.config import dbconfig >>> dbconfig.override(dbuser='statistician') >>> reset_store() >>> print store.execute("select current_user").get_one()[0] statistician >>> store.execute(""" ... INSERT INTO SourcePackageName(name) VALUES ('fnord4') ... """, noresult=True) Traceback (most recent call last): ... ProgrammingError: permission denied for relation sourcepackagename This is not reset at the end of the transaction: >>> transaction.abort() >>> print store.execute("select current_user").get_one()[0] statistician >>> store.execute(""" ... INSERT INTO SourcePackageName(name) VALUES ('fnord4') ... """, noresult=True) Traceback (most recent call last): ... ProgrammingError: permission denied for relation sourcepackagename >>> transaction.abort() So you need to explicitly set the user back to the default: >>> dbconfig.override(dbuser=None) >>> reset_store() >>> print store.execute("select current_user").get_one()[0] launchpad_main >>> store.execute(""" ... INSERT INTO SourcePackageName(name) VALUES ('fnord4') ... """, noresult=True) Reset out config: >>> base_test_data = config.pop('base_test_data')