1
# Copyright 2009 Canonical Ltd. This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
4
"""A self maintaining mock database for tests.
6
The first time a `MockDbConnection` is used, it functions as a proxy
7
to the real database connection. Queries and results are recorded into
10
For subsequent runs, if the same queries are issued in the same order
11
then results are returned from the script and the real database is
12
not used. If the script is detected as being invalid, it is removed and
13
a `RetryTest` exception raised for the test runner to deal with.
18
'MockDbConnection', 'script_filename',
19
'ScriptPlayer', 'ScriptRecorder',
22
import cPickle as pickle
28
# from zope.testing.testrunner import RetryTest
30
from canonical.config import config
33
SCRIPT_DIR = os.path.join(config.root, 'mockdbscripts~')
36
def script_filename(key):
37
"""Calculate and return the script filename to use."""
38
key = urllib.quote(key, safe='')
39
return os.path.join(SCRIPT_DIR, key) + '.pickle.gz'
43
"""An entry in our test's log of database calls."""
45
# The connection number used for this command. All connections used
46
# by a test store their commands in a single list to preserve global
47
# ordering, and we use connection_number to differentiate them.
48
connection_number = None
50
# If the command raised an exception, it is stored here.
53
def __init__(self, connection):
54
if connection is not None:
55
self.connection_number = connection.connection_number
58
class ConnectScriptEntry(ScriptEntry):
59
"""An entry created instantiating a Connection."""
60
args = None # Arguments passed to the connect() method.
61
kw = None # Keyword arguments passed to the connect() method.
63
def __init__(self, connection, *args, **kw):
64
super(ConnectScriptEntry, self).__init__(connection)
69
class ExecuteScriptEntry(ScriptEntry):
70
"""An entry created via Cursor.execute()."""
71
query = None # Query passed to Cursor.execute().
72
params = None # Parameters passed to Cursor.execute().
73
results = None # Cursor.fetchall() results as a list.
74
description = None # Cursor.description as per DB-API.
75
rowcount = None # Cursor.rowcount after Cursor.fetchall() as per DB-API.
77
def __init__(self, connection, query, params):
78
super(ExecuteScriptEntry, self).__init__(connection)
83
class CloseScriptEntry(ScriptEntry):
84
"""An entry created via Connection.close()."""
87
class CommitScriptEntry(ScriptEntry):
88
"""An entry created via Connection.commit()."""
91
class RollbackScriptEntry(ScriptEntry):
92
"""An entry created via Connection.rollback()."""
95
class SetIsolationLevelScriptEntry(ScriptEntry):
96
"""An entry created via Connection.set_isolation_level()."""
97
level = None # The requested isolation level
98
def __init__(self, connection, level):
99
super(SetIsolationLevelScriptEntry, self).__init__(connection)
103
class ScriptRecorder:
104
key = None # A unique key identifying this test.
105
script_filename = None # Path to our script file.
109
def __init__(self, key):
111
self.script_filename = script_filename(key)
113
self.connections = []
115
def connect(self, connect_func, *args, **kw):
116
"""Open a connection to the database, returning a `MockDbConnection`.
119
connection = connect_func(*args, **kw)
121
except (psycopg2.Warning, psycopg2.Error), connect_exception:
123
exception = connect_exception
125
connection = MockDbConnection(self, connection, *args, **kw)
127
self.connections.append(connection)
128
if connection is not None:
129
connection.connection_number = self.connections.index(connection)
130
entry = ConnectScriptEntry(connection, *args, **kw)
131
self.log.append(entry)
133
entry.exception = exception
134
#pylint: disable-msg=W0706
138
def cursor(self, connection):
139
"""Return a MockDbCursor."""
140
real_cursor = connection.real_connection.cursor()
141
return MockDbCursor(connection, real_cursor)
143
def execute(self, cursor, query, params=None):
144
"""Handle Cursor.execute()."""
145
con = cursor.connection
146
entry = ExecuteScriptEntry(con, query, params)
148
real_cursor = cursor.real_cursor
150
real_cursor.execute(query, params)
151
except (psycopg2.Warning, psycopg2.Error), exception:
152
entry.exception = exception
153
self.log.append(entry)
156
entry.rowcount = real_cursor.rowcount
158
entry.results = list(real_cursor.fetchall())
159
entry.description = real_cursor.description
160
# Might have changed now fetchall() has been done.
161
entry.rowcount = real_cursor.rowcount
162
except psycopg2.ProgrammingError, exc:
163
if exc.args[0] == 'no results to fetch':
164
# No results, such as an UPDATE query.
169
self.log.append(entry)
172
def close(self, connection):
173
"""Handle Connection.close()."""
174
entry = CloseScriptEntry(connection)
176
if connection.real_connection is not None:
177
connection.real_connection.close()
178
except (psycopg2.Warning, psycopg2.Error), exception:
179
entry.exception = exception
180
self.log.append(entry)
183
self.log.append(entry)
185
def commit(self, connection):
186
"""Handle Connection.commit()."""
187
entry = CommitScriptEntry(connection)
189
connection.real_connection.commit()
190
except (psycopg2.Warning, psycopg2.Error), exception:
191
entry.exception = exception
192
self.log.append(entry)
195
self.log.append(entry)
197
def rollback(self, connection):
198
"""Handle Connection.rollback()."""
199
entry = RollbackScriptEntry(connection)
201
connection.real_connection.rollback()
202
except (psycopg2.Warning, psycopg2.Error), exception:
203
entry.exception = exception
204
self.log.append(entry)
207
self.log.append(entry)
209
def set_isolation_level(self, connection, level):
210
"""Handle Connection.set_isolation_level()."""
211
entry = SetIsolationLevelScriptEntry(connection, level)
213
connection.real_connection.set_isolation_level(level)
214
except (psycopg2.Warning, psycopg2.Error), exception:
215
entry.exception = exception
216
self.log.append(entry)
219
self.log.append(entry)
222
"""Store the script for future use by a ScriptPlayer."""
223
# Create script directory if necessary.
224
if not os.path.isdir(SCRIPT_DIR):
225
os.makedirs(SCRIPT_DIR, mode=0700)
227
# Save log to a pickle. The pickle contains the key for this test,
228
# followed by a list of ScriptEntry-derived objects.
229
obj_to_store = [self.key] + self.log
231
obj_to_store, gzip.open(self.script_filename, 'wb'),
232
pickle.HIGHEST_PROTOCOL
235
# Trash all the connected connections. This isn't strictly necessary
236
# but protects us from silly mistakes.
237
while self.connections:
238
con = self.connections.pop()
239
if con is not None and not con._closed:
243
def noop_if_invalid(func):
244
"""Decorator that causes the decorated method to be a noop if the
245
script this method belongs too is invalid.
247
This allows teardown to complete when a ReplayScript has
248
raised a RetryTest exception. Normally during teardown DB operations
249
are made when doing things like aborting the transaction.
251
def dont_retry_func(self, *args, **kw):
255
return func(self, *args, **kw)
256
return dont_retry_func
260
"""Replay database queries from a script."""
262
key = None # Unique key identifying this test.
263
script_filename = None # File storing our statement/result script.
264
log = None # List of ScriptEntry objects loaded from _script_filename.
265
connections = None # List of connections using this script.
267
invalid = False # If True, the script is invalid and we are tearing down.
269
def __init__(self, key):
271
self.script_filename = script_filename(key)
272
self.log = pickle.load(gzip.open(self.script_filename, 'rb'))
274
stored_key = self.log.pop(0)
275
assert stored_key == key, "Script loaded for wrong key."
277
self.handleInvalidScript(
278
"Connection key not stored in script."
281
self.connections = []
283
def getNextEntry(self, connection, expected_entry_class):
284
"""Pull the next entry from the script.
286
Invokes handleInvalidScript on error, including some entry validation.
289
entry = self.log.pop(0)
291
self.handleInvalidScript('Ran out of commands.')
293
# This guards against file format changes as well as file corruption.
294
if not isinstance(entry, ScriptEntry):
295
self.handleInvalidScript('Unexpected object type in script')
297
if connection.connection_number != entry.connection_number:
298
self.handleInvalidScript(
299
'Expected query to connection %s '
300
'but got query to connection %s'
301
% (entry.connection_number, connection.connection_number)
304
if not isinstance(entry, expected_entry_class):
305
self.handleInvalidScript(
306
'Expected %s but got %s'
307
% (expected_entry_class, entry.__class__)
313
def connect(self, connect_func, *args, **kw):
314
"""Return a `MockDbConnection`.
316
Does not actually connect to the database - we are in replay mode.
318
connection = MockDbConnection(self, None, *args, **kw)
319
self.connections.append(connection)
320
connection.connection_number = self.connections.index(connection)
321
entry = self.getNextEntry(connection, ConnectScriptEntry)
322
if (entry.args, entry.kw) != (args, kw):
323
self.handleInvalidScript("Connection parameters have changed.")
324
if entry.exception is not None:
325
raise entry.exception
328
def cursor(self, connection):
329
"""Return a MockDbCursor."""
330
return MockDbCursor(connection, real_cursor=None)
333
def execute(self, cursor, query, params=None):
334
"""Handle Cursor.execute()."""
335
connection = cursor.connection
336
entry = self.getNextEntry(connection, ExecuteScriptEntry)
338
if query != entry.query:
339
self.handleInvalidScript(
340
'Unexpected command. Expected %s. Got %s.'
341
% (entry.query, query)
344
if params != entry.params:
345
self.handleInvalidScript(
346
'Unexpected parameters. Expected %r. Got %r.'
347
% (entry.params, params)
350
if entry.exception is not None:
351
raise entry.exception
356
def close(self, connection):
357
"""Handle Connection.close()."""
358
entry = self.getNextEntry(connection, CloseScriptEntry)
359
if entry.exception is not None:
360
raise entry.exception
362
def commit(self, connection):
363
"""Handle Connection.commit()."""
364
entry = self.getNextEntry(connection, CommitScriptEntry)
365
if entry.exception is not None:
366
raise entry.exception
369
def rollback(self, connection):
370
"""Handle Connection.rollback()."""
371
entry = self.getNextEntry(connection, RollbackScriptEntry)
372
if entry.exception is not None:
373
raise entry.exception
376
def set_isolation_level(self, connection, level):
377
"""Handle Connection.set_isolation_level()."""
378
entry = self.getNextEntry(connection, SetIsolationLevelScriptEntry)
379
if entry.level != level:
380
self.handleInvalidScript("Different isolation level requested.")
381
if entry.exception is not None:
382
raise entry.exception
384
def handleInvalidScript(self, reason):
385
"""Remove the script from disk and raise a RetryTest exception."""
386
if os.path.exists(self.script_filename):
387
os.unlink(self.script_filename)
389
raise RetryTest(reason) # Leaving this as a name error: this should
390
# not be called unless we reinstate the retry behavior in zope.testing.
393
class MockDbConnection:
394
"""Connection to our Mock database."""
396
real_connection = None
397
connection_number = None
400
def __init__(self, script, real_connection, *args, **kw):
401
"""Initialize the `MockDbConnection`.
403
`MockDbConnection` intances are generally only created in the
404
*Script.connect() methods as the attempt needs to be recorded
407
If we have a real_connection, we are proxying and recording results.
408
If real_connection is None, we are replaying results from the script.
410
*args and **kw are the arguments passed to open the real connection
411
and are used by the script to confirm the db connection details have
412
not been changed; a `RetryTest` exception may be raised in replay mode.
415
self.real_connection = real_connection
419
return self.script.cursor(self)
423
def _checkClosed(self):
424
"""Guard that raises an exception if the connection is closed."""
425
if self._closed is True:
426
raise psycopg2.InterfaceError('Connection closed.')
430
# DB-API says closing a closed connection should raise an exception
431
# ("exception will be raised if any operation is attempted
432
# wht the [closed] connection"), but psycopg1 doesn't do this.
433
# It would be nice if our wrapper could be more strict than psycopg1,
434
# but unfortunately the sqlos/sqlobject combination relies on this
435
# behavior. So we have to emulate it.
439
self.script.close(self)
445
self.script.commit(self)
450
self.script.rollback(self)
452
def set_isolation_level(self, level):
453
"""As per psycopg1 extension."""
455
self.script.set_isolation_level(self, level)
457
# Exceptions exposed on connection, as per optional DB-API extension.
458
## Disabled, as psycopg1 does not implement this extension.
459
## Warning = psycopg.Warning
460
## Error = psycopg.Error
461
## InterfaceError = psycopg.InterfaceError
462
## DatabaseError = psycopg.DatabaseError
463
## DataError = psycopg.DataError
464
## OperationalError = psycopg.OperationalError
465
## IntegrityError = psycopg.IntegrityError
466
## InternalError = psycopg.InternalError
467
## ProgrammingError = psycopg.ProgrammingError
468
## NotSupportedError = psycopg.NotSupportedError
472
"""A fake DB-API cursor as produced by MockDbConnection.cursor.
474
The real work is done by the associated ScriptRecorder or ScriptPlayer
475
using the common interface, making this class independent on what
476
mode the mock database is running in.
478
# The ExecuteScriptEntry for the in progress query, if there is one.
479
# It stores any unfetched results and cursor metadata such as the
480
# resultset description.
483
arraysize = 100 # As per DB-API.
484
connection = None # As per DB-API optional extension.
485
real_cursor = None # The real cursor if there is one.
487
def __init__(self, connection, real_cursor=None):
488
self.connection = connection
489
self.real_cursor = real_cursor
492
def description(self):
493
"""As per DB-API, pulled from the script entry."""
494
if self._script_entry is None:
496
return self._script_entry.description
500
"""Return the rowcount only if all the results have been consumed.
502
As per DB-API, pulled from the script entry.
504
if not isinstance(self._script_entry, ExecuteScriptEntry):
507
results = self._script_entry.results
509
if results is None: # DELETE or UPDATE set rowcount.
510
return self._script_entry.rowcount
512
if results is None or self._fetch_position < len(results):
514
return self._script_entry.rowcount
522
if self.real_cursor is not None:
523
self.real_cursor.close()
524
self.real_cursor = None
525
self.connection = None
527
def _checkClosed(self):
528
"""Raise an exception if the cursor or connection is closed."""
529
if self._closed is True:
530
raise psycopg2.Error('Cursor closed.')
531
self.connection._checkClosed()
533
# Index in our results that the next fetch will return. We don't consume
534
# the results list: if we are recording we need to keep the results
535
# so we can serialize at the end of the test.
538
def execute(self, query, parameters=None):
541
self._script_entry = self.connection.script.execute(
542
self, query, parameters
544
self._fetch_position = 0
546
def executemany(self, query, seq_of_parameters=None):
549
raise NotImplementedError('executemany')
554
if self._script_entry is None:
555
raise psycopg2.Error("No query issued yet")
556
if self._script_entry.results is None:
557
raise psycopg2.Error("Query returned no results")
559
row = self._script_entry.results[self._fetch_position]
560
self._fetch_position += 1
565
def fetchmany(self, size=None):
568
raise NotImplementedError('fetchmany')
573
if self._script_entry is None:
574
raise psycopg2.Error('No query issued yet')
575
if self._script_entry.results is None:
576
raise psycopg2.Error('Query returned no results')
577
results = self._script_entry.results[self._fetch_position:]
578
self._fetch_position = len(results)
584
raise NotImplementedError('nextset')
586
def setinputsizes(self, sizes):
591
def setoutputsize(self, size, column=None):
596
## psycopg1 does not support this extension.
599
## """As per iterator spec and DB-API optional extension."""
600
## row = self.fetchone()
602
## raise StopIteration
606
## def __iter__(self):
607
## """As per iterator spec and DB-API optional extension."""