~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/canonical/testing/mockdb.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2004-08-03 09:17:25 UTC
  • mfrom: (unknown (missing))
  • Revision ID: Arch-1:rocketfuel@canonical.com%launchpad--devel--0--patch-19
Removed defaultSkin directive to make launchpad work with the latest zope.
Patches applied:

 * steve.alexander@canonical.com/launchpad--devel--0--patch-16
   merge from rocketfuel

 * steve.alexander@canonical.com/launchpad--devel--0--patch-17
   removed use of defaultSkin directive, which has been removed from zope3

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2009 Canonical Ltd.  This software is licensed under the
2
 
# GNU Affero General Public License version 3 (see the file LICENSE).
3
 
 
4
 
"""A self maintaining mock database for tests.
5
 
 
6
 
The first time a `MockDbConnection` is used, it functions as a proxy
7
 
to the real database connection. Queries and results are recorded into
8
 
a script.
9
 
 
10
 
For subsequent runs, if the same queries are issued in the same order
11
 
then results are returned from the script and the real database is
12
 
not used. If the script is detected as being invalid, it is removed and
13
 
a `RetryTest` exception raised for the test runner to deal with.
14
 
"""
15
 
 
16
 
__metaclass__ = type
17
 
__all__ = [
18
 
        'MockDbConnection', 'script_filename',
19
 
        'ScriptPlayer', 'ScriptRecorder',
20
 
        ]
21
 
 
22
 
import cPickle as pickle
23
 
import gzip
24
 
import os.path
25
 
import urllib
26
 
 
27
 
import psycopg2
28
 
# from zope.testing.testrunner import RetryTest
29
 
 
30
 
from canonical.config import config
31
 
 
32
 
 
33
 
SCRIPT_DIR = os.path.join(config.root, 'mockdbscripts~')
34
 
 
35
 
 
36
 
def script_filename(key):
37
 
    """Calculate and return the script filename to use."""
38
 
    key = urllib.quote(key, safe='')
39
 
    return os.path.join(SCRIPT_DIR, key) + '.pickle.gz'
40
 
 
41
 
 
42
 
class ScriptEntry:
43
 
    """An entry in our test's log of database calls."""
44
 
 
45
 
    # The connection number used for this command. All connections used
46
 
    # by a test store their commands in a single list to preserve global
47
 
    # ordering, and we use connection_number to differentiate them.
48
 
    connection_number = None
49
 
 
50
 
    # If the command raised an exception, it is stored here.
51
 
    exception = None
52
 
 
53
 
    def __init__(self, connection):
54
 
        if connection is not None:
55
 
            self.connection_number = connection.connection_number
56
 
 
57
 
 
58
 
class ConnectScriptEntry(ScriptEntry):
59
 
    """An entry created instantiating a Connection."""
60
 
    args = None # Arguments passed to the connect() method.
61
 
    kw = None # Keyword arguments passed to the connect() method.
62
 
    
63
 
    def __init__(self, connection, *args, **kw):
64
 
        super(ConnectScriptEntry, self).__init__(connection)
65
 
        self.args = args
66
 
        self.kw = kw
67
 
 
68
 
 
69
 
class ExecuteScriptEntry(ScriptEntry):
70
 
    """An entry created via Cursor.execute()."""
71
 
    query = None # Query passed to Cursor.execute().
72
 
    params = None # Parameters passed to Cursor.execute().
73
 
    results = None # Cursor.fetchall() results as a list.
74
 
    description = None # Cursor.description as per DB-API.
75
 
    rowcount = None # Cursor.rowcount after Cursor.fetchall() as per DB-API.
76
 
 
77
 
    def __init__(self, connection, query, params):
78
 
        super(ExecuteScriptEntry, self).__init__(connection)
79
 
        self.query = query
80
 
        self.params = params
81
 
 
82
 
 
83
 
class CloseScriptEntry(ScriptEntry):
84
 
    """An entry created via Connection.close()."""
85
 
 
86
 
 
87
 
class CommitScriptEntry(ScriptEntry):
88
 
    """An entry created via Connection.commit()."""
89
 
 
90
 
 
91
 
class RollbackScriptEntry(ScriptEntry):
92
 
    """An entry created via Connection.rollback()."""
93
 
 
94
 
 
95
 
class SetIsolationLevelScriptEntry(ScriptEntry):
96
 
    """An entry created via Connection.set_isolation_level()."""
97
 
    level = None # The requested isolation level
98
 
    def __init__(self, connection, level):
99
 
        super(SetIsolationLevelScriptEntry, self).__init__(connection)
100
 
        self.level = level
101
 
 
102
 
 
103
 
class ScriptRecorder:
104
 
    key = None # A unique key identifying this test.
105
 
    script_filename = None # Path to our script file.
106
 
    log = None
107
 
    connections = None
108
 
 
109
 
    def __init__(self, key):
110
 
        self.key = key
111
 
        self.script_filename = script_filename(key)
112
 
        self.log = []
113
 
        self.connections = []
114
 
 
115
 
    def connect(self, connect_func, *args, **kw):
116
 
        """Open a connection to the database, returning a `MockDbConnection`.
117
 
        """
118
 
        try:
119
 
            connection = connect_func(*args, **kw)
120
 
            exception = None
121
 
        except (psycopg2.Warning, psycopg2.Error), connect_exception:
122
 
            connection = None
123
 
            exception = connect_exception
124
 
 
125
 
        connection = MockDbConnection(self, connection, *args, **kw)
126
 
 
127
 
        self.connections.append(connection)
128
 
        if connection is not None:
129
 
            connection.connection_number = self.connections.index(connection)
130
 
        entry = ConnectScriptEntry(connection, *args, **kw)
131
 
        self.log.append(entry)
132
 
        if exception:
133
 
            entry.exception = exception
134
 
            #pylint: disable-msg=W0706
135
 
            raise exception
136
 
        return connection
137
 
 
138
 
    def cursor(self, connection):
139
 
        """Return a MockDbCursor."""
140
 
        real_cursor = connection.real_connection.cursor()
141
 
        return MockDbCursor(connection, real_cursor)
142
 
 
143
 
    def execute(self, cursor, query, params=None):
144
 
        """Handle Cursor.execute()."""
145
 
        con = cursor.connection
146
 
        entry = ExecuteScriptEntry(con, query, params)
147
 
 
148
 
        real_cursor = cursor.real_cursor
149
 
        try:
150
 
            real_cursor.execute(query, params)
151
 
        except (psycopg2.Warning, psycopg2.Error), exception:
152
 
            entry.exception = exception
153
 
            self.log.append(entry)
154
 
            raise
155
 
 
156
 
        entry.rowcount = real_cursor.rowcount
157
 
        try:
158
 
            entry.results = list(real_cursor.fetchall())
159
 
            entry.description = real_cursor.description
160
 
            # Might have changed now fetchall() has been done.
161
 
            entry.rowcount = real_cursor.rowcount
162
 
        except psycopg2.ProgrammingError, exc:
163
 
            if exc.args[0] == 'no results to fetch':
164
 
                # No results, such as an UPDATE query.
165
 
                entry.results = None
166
 
            else:
167
 
                raise
168
 
 
169
 
        self.log.append(entry)
170
 
        return entry
171
 
 
172
 
    def close(self, connection):
173
 
        """Handle Connection.close()."""
174
 
        entry = CloseScriptEntry(connection)
175
 
        try:
176
 
            if connection.real_connection is not None:
177
 
                connection.real_connection.close()
178
 
        except (psycopg2.Warning, psycopg2.Error), exception:
179
 
            entry.exception = exception
180
 
            self.log.append(entry)
181
 
            raise
182
 
        else:
183
 
            self.log.append(entry)
184
 
 
185
 
    def commit(self, connection):
186
 
        """Handle Connection.commit()."""
187
 
        entry = CommitScriptEntry(connection)
188
 
        try:
189
 
            connection.real_connection.commit()
190
 
        except (psycopg2.Warning, psycopg2.Error), exception:
191
 
            entry.exception = exception
192
 
            self.log.append(entry)
193
 
            raise
194
 
        else:
195
 
            self.log.append(entry)
196
 
 
197
 
    def rollback(self, connection):
198
 
        """Handle Connection.rollback()."""
199
 
        entry = RollbackScriptEntry(connection)
200
 
        try:
201
 
            connection.real_connection.rollback()
202
 
        except (psycopg2.Warning, psycopg2.Error), exception:
203
 
            entry.exception = exception
204
 
            self.log.append(entry)
205
 
            raise
206
 
        else:
207
 
            self.log.append(entry)
208
 
 
209
 
    def set_isolation_level(self, connection, level):
210
 
        """Handle Connection.set_isolation_level()."""
211
 
        entry = SetIsolationLevelScriptEntry(connection, level)
212
 
        try:
213
 
            connection.real_connection.set_isolation_level(level)
214
 
        except (psycopg2.Warning, psycopg2.Error), exception:
215
 
            entry.exception = exception
216
 
            self.log.append(entry)
217
 
            raise
218
 
        else:
219
 
            self.log.append(entry)
220
 
 
221
 
    def store(self):
222
 
        """Store the script for future use by a ScriptPlayer."""
223
 
        # Create script directory if necessary.
224
 
        if not os.path.isdir(SCRIPT_DIR):
225
 
            os.makedirs(SCRIPT_DIR, mode=0700)
226
 
 
227
 
        # Save log to a pickle. The pickle contains the key for this test,
228
 
        # followed by a list of ScriptEntry-derived objects.
229
 
        obj_to_store = [self.key] + self.log
230
 
        pickle.dump(
231
 
                obj_to_store, gzip.open(self.script_filename, 'wb'),
232
 
                pickle.HIGHEST_PROTOCOL
233
 
                )
234
 
 
235
 
        # Trash all the connected connections. This isn't strictly necessary
236
 
        # but protects us from silly mistakes.
237
 
        while self.connections:
238
 
            con = self.connections.pop()
239
 
            if con is not None and not con._closed:
240
 
                con.close()
241
 
 
242
 
 
243
 
def noop_if_invalid(func):
244
 
    """Decorator that causes the decorated method to be a noop if the
245
 
    script this method belongs too is invalid.
246
 
 
247
 
    This allows teardown to complete when a ReplayScript has
248
 
    raised a RetryTest exception. Normally during teardown DB operations
249
 
    are made when doing things like aborting the transaction.
250
 
    """
251
 
    def dont_retry_func(self, *args, **kw):
252
 
        if self.invalid:
253
 
            return None
254
 
        else:
255
 
            return func(self, *args, **kw)
256
 
    return dont_retry_func
257
 
 
258
 
 
259
 
class ScriptPlayer:
260
 
    """Replay database queries from a script."""
261
 
 
262
 
    key = None # Unique key identifying this test.
263
 
    script_filename = None # File storing our statement/result script.
264
 
    log = None # List of ScriptEntry objects loaded from _script_filename.
265
 
    connections = None # List of connections using this script.
266
 
 
267
 
    invalid = False # If True, the script is invalid and we are tearing down.
268
 
 
269
 
    def __init__(self, key):
270
 
        self.key = key
271
 
        self.script_filename = script_filename(key)
272
 
        self.log = pickle.load(gzip.open(self.script_filename, 'rb'))
273
 
        try:
274
 
            stored_key = self.log.pop(0)
275
 
            assert stored_key == key, "Script loaded for wrong key."
276
 
        except IndexError:
277
 
            self.handleInvalidScript(
278
 
                    "Connection key not stored in script."
279
 
                    )
280
 
 
281
 
        self.connections = []
282
 
 
283
 
    def getNextEntry(self, connection, expected_entry_class):
284
 
        """Pull the next entry from the script.
285
 
 
286
 
        Invokes handleInvalidScript on error, including some entry validation.
287
 
        """
288
 
        try:
289
 
            entry = self.log.pop(0)
290
 
        except IndexError:
291
 
            self.handleInvalidScript('Ran out of commands.')
292
 
 
293
 
        # This guards against file format changes as well as file corruption.
294
 
        if not isinstance(entry, ScriptEntry):
295
 
            self.handleInvalidScript('Unexpected object type in script')
296
 
 
297
 
        if connection.connection_number != entry.connection_number:
298
 
            self.handleInvalidScript(
299
 
                    'Expected query to connection %s '
300
 
                    'but got query to connection %s'
301
 
                    % (entry.connection_number, connection.connection_number)
302
 
                    )
303
 
 
304
 
        if not isinstance(entry, expected_entry_class):
305
 
            self.handleInvalidScript(
306
 
                    'Expected %s but got %s'
307
 
                    % (expected_entry_class, entry.__class__)
308
 
                    )
309
 
 
310
 
        return entry
311
 
 
312
 
    @noop_if_invalid
313
 
    def connect(self, connect_func, *args, **kw):
314
 
        """Return a `MockDbConnection`.
315
 
       
316
 
        Does not actually connect to the database - we are in replay mode.
317
 
        """
318
 
        connection = MockDbConnection(self, None, *args, **kw)
319
 
        self.connections.append(connection)
320
 
        connection.connection_number = self.connections.index(connection)
321
 
        entry = self.getNextEntry(connection, ConnectScriptEntry)
322
 
        if (entry.args, entry.kw) != (args, kw):
323
 
            self.handleInvalidScript("Connection parameters have changed.")
324
 
        if entry.exception is not None:
325
 
            raise entry.exception
326
 
        return connection
327
 
 
328
 
    def cursor(self, connection):
329
 
        """Return a MockDbCursor."""
330
 
        return MockDbCursor(connection, real_cursor=None)
331
 
 
332
 
    @noop_if_invalid
333
 
    def execute(self, cursor, query, params=None):
334
 
        """Handle Cursor.execute()."""
335
 
        connection = cursor.connection
336
 
        entry = self.getNextEntry(connection, ExecuteScriptEntry)
337
 
 
338
 
        if query != entry.query:
339
 
            self.handleInvalidScript(
340
 
                    'Unexpected command. Expected %s. Got %s.'
341
 
                    % (entry.query, query)
342
 
                    )
343
 
 
344
 
        if params != entry.params:
345
 
            self.handleInvalidScript(
346
 
                    'Unexpected parameters. Expected %r. Got %r.'
347
 
                    % (entry.params, params)
348
 
                    )
349
 
 
350
 
        if entry.exception is not None:
351
 
            raise entry.exception
352
 
 
353
 
        return entry
354
 
 
355
 
    @noop_if_invalid
356
 
    def close(self, connection):
357
 
        """Handle Connection.close()."""
358
 
        entry = self.getNextEntry(connection, CloseScriptEntry)
359
 
        if entry.exception is not None:
360
 
            raise entry.exception
361
 
 
362
 
    def commit(self, connection):
363
 
        """Handle Connection.commit()."""
364
 
        entry = self.getNextEntry(connection, CommitScriptEntry)
365
 
        if entry.exception is not None:
366
 
            raise entry.exception
367
 
 
368
 
    @noop_if_invalid
369
 
    def rollback(self, connection):
370
 
        """Handle Connection.rollback()."""
371
 
        entry = self.getNextEntry(connection, RollbackScriptEntry)
372
 
        if entry.exception is not None:
373
 
            raise entry.exception
374
 
 
375
 
    @noop_if_invalid
376
 
    def set_isolation_level(self, connection, level):
377
 
        """Handle Connection.set_isolation_level()."""
378
 
        entry = self.getNextEntry(connection, SetIsolationLevelScriptEntry)
379
 
        if entry.level != level:
380
 
            self.handleInvalidScript("Different isolation level requested.")
381
 
        if entry.exception is not None:
382
 
            raise entry.exception
383
 
 
384
 
    def handleInvalidScript(self, reason):
385
 
        """Remove the script from disk and raise a RetryTest exception."""
386
 
        if os.path.exists(self.script_filename):
387
 
            os.unlink(self.script_filename)
388
 
        self.invalid = True
389
 
        raise RetryTest(reason) # Leaving this as a name error: this should
390
 
        # not be called unless we reinstate the retry behavior in zope.testing.
391
 
 
392
 
 
393
 
class MockDbConnection:
394
 
    """Connection to our Mock database."""
395
 
 
396
 
    real_connection = None
397
 
    connection_number = None
398
 
    script = None
399
 
 
400
 
    def __init__(self, script, real_connection, *args, **kw):
401
 
        """Initialize the `MockDbConnection`.
402
 
 
403
 
        `MockDbConnection` intances are generally only created in the
404
 
        *Script.connect() methods as the attempt needs to be recorded
405
 
        (even if it fails).
406
 
 
407
 
        If we have a real_connection, we are proxying and recording results.
408
 
        If real_connection is None, we are replaying results from the script.
409
 
 
410
 
        *args and **kw are the arguments passed to open the real connection
411
 
        and are used by the script to confirm the db connection details have
412
 
        not been changed; a `RetryTest` exception may be raised in replay mode.
413
 
        """
414
 
        self.script = script
415
 
        self.real_connection = real_connection
416
 
 
417
 
    def cursor(self):
418
 
        """As per DB-API."""
419
 
        return self.script.cursor(self)
420
 
 
421
 
    _closed = False
422
 
 
423
 
    def _checkClosed(self):
424
 
        """Guard that raises an exception if the connection is closed."""
425
 
        if self._closed is True:
426
 
            raise psycopg2.InterfaceError('Connection closed.')
427
 
 
428
 
    def close(self):
429
 
        """As per DB-API."""
430
 
        # DB-API says closing a closed connection should raise an exception
431
 
        # ("exception will be raised if any operation is attempted
432
 
        # wht the [closed] connection"), but psycopg1 doesn't do this.
433
 
        # It would be nice if our wrapper could be more strict than psycopg1,
434
 
        # but unfortunately the sqlos/sqlobject combination relies on this
435
 
        # behavior. So we have to emulate it.
436
 
        if self._closed:
437
 
            return
438
 
        #self._checkClosed()
439
 
        self.script.close(self)
440
 
        self._closed = True
441
 
 
442
 
    def commit(self):
443
 
        """As per DB-API."""
444
 
        self._checkClosed()
445
 
        self.script.commit(self)
446
 
 
447
 
    def rollback(self):
448
 
        """As per DB-API."""
449
 
        self._checkClosed()
450
 
        self.script.rollback(self)
451
 
 
452
 
    def set_isolation_level(self, level):
453
 
        """As per psycopg1 extension."""
454
 
        self._checkClosed()
455
 
        self.script.set_isolation_level(self, level)
456
 
 
457
 
    # Exceptions exposed on connection, as per optional DB-API extension.
458
 
    ## Disabled, as psycopg1 does not implement this extension.
459
 
    ## Warning = psycopg.Warning
460
 
    ## Error = psycopg.Error
461
 
    ## InterfaceError = psycopg.InterfaceError
462
 
    ## DatabaseError = psycopg.DatabaseError
463
 
    ## DataError = psycopg.DataError
464
 
    ## OperationalError = psycopg.OperationalError
465
 
    ## IntegrityError = psycopg.IntegrityError
466
 
    ## InternalError = psycopg.InternalError
467
 
    ## ProgrammingError = psycopg.ProgrammingError
468
 
    ## NotSupportedError = psycopg.NotSupportedError
469
 
 
470
 
 
471
 
class MockDbCursor:
472
 
    """A fake DB-API cursor as produced by MockDbConnection.cursor.
473
 
    
474
 
    The real work is done by the associated ScriptRecorder or ScriptPlayer
475
 
    using the common interface, making this class independent on what
476
 
    mode the mock database is running in.
477
 
    """
478
 
    # The ExecuteScriptEntry for the in progress query, if there is one.
479
 
    # It stores any unfetched results and cursor metadata such as the
480
 
    # resultset description.
481
 
    _script_entry = None
482
 
 
483
 
    arraysize = 100 # As per DB-API.
484
 
    connection = None # As per DB-API optional extension.
485
 
    real_cursor = None # The real cursor if there is one.
486
 
 
487
 
    def __init__(self, connection, real_cursor=None):
488
 
        self.connection = connection
489
 
        self.real_cursor = real_cursor
490
 
 
491
 
    @property
492
 
    def description(self):
493
 
        """As per DB-API, pulled from the script entry."""
494
 
        if self._script_entry is None:
495
 
            return None
496
 
        return self._script_entry.description
497
 
 
498
 
    @property
499
 
    def rowcount(self):
500
 
        """Return the rowcount only if all the results have been consumed.
501
 
 
502
 
        As per DB-API, pulled from the script entry.
503
 
        """
504
 
        if not isinstance(self._script_entry, ExecuteScriptEntry):
505
 
            return -1
506
 
 
507
 
        results = self._script_entry.results
508
 
 
509
 
        if results is None: # DELETE or UPDATE set rowcount.
510
 
            return self._script_entry.rowcount
511
 
        
512
 
        if results is None or self._fetch_position < len(results):
513
 
            return -1
514
 
        return self._script_entry.rowcount
515
 
 
516
 
    _closed = False
517
 
 
518
 
    def close(self):
519
 
        """As per DB-API."""
520
 
        self._checkClosed()
521
 
        self._closed = True
522
 
        if self.real_cursor is not None:
523
 
            self.real_cursor.close()
524
 
            self.real_cursor = None
525
 
            self.connection = None
526
 
 
527
 
    def _checkClosed(self):
528
 
        """Raise an exception if the cursor or connection is closed."""
529
 
        if self._closed is True:
530
 
            raise psycopg2.Error('Cursor closed.')
531
 
        self.connection._checkClosed()
532
 
 
533
 
    # Index in our results that the next fetch will return. We don't consume
534
 
    # the results list: if we are recording we need to keep the results 
535
 
    # so we can serialize at the end of the test.
536
 
    _fetch_position = 0
537
 
 
538
 
    def execute(self, query, parameters=None):
539
 
        """As per DB-API."""
540
 
        self._checkClosed()
541
 
        self._script_entry = self.connection.script.execute(
542
 
                self, query, parameters
543
 
                )
544
 
        self._fetch_position = 0
545
 
 
546
 
    def executemany(self, query, seq_of_parameters=None):
547
 
        """As per DB-API."""
548
 
        self._checkClosed()
549
 
        raise NotImplementedError('executemany')
550
 
 
551
 
    def fetchone(self):
552
 
        """As per DB-API."""
553
 
        self._checkClosed()
554
 
        if self._script_entry is None:
555
 
            raise psycopg2.Error("No query issued yet")
556
 
        if self._script_entry.results is None:
557
 
            raise psycopg2.Error("Query returned no results")
558
 
        try:
559
 
            row = self._script_entry.results[self._fetch_position]
560
 
            self._fetch_position += 1
561
 
            return row
562
 
        except IndexError:
563
 
            return None
564
 
 
565
 
    def fetchmany(self, size=None):
566
 
        """As per DB-API."""
567
 
        self._checkClosed()
568
 
        raise NotImplementedError('fetchmany')
569
 
 
570
 
    def fetchall(self):
571
 
        """As per DB-API."""
572
 
        self._checkClosed()
573
 
        if self._script_entry is None:
574
 
            raise psycopg2.Error('No query issued yet')
575
 
        if self._script_entry.results is None:
576
 
            raise psycopg2.Error('Query returned no results')
577
 
        results = self._script_entry.results[self._fetch_position:]
578
 
        self._fetch_position = len(results)
579
 
        return results
580
 
 
581
 
    def nextset(self):
582
 
        """As per DB-API."""
583
 
        self._checkClosed()
584
 
        raise NotImplementedError('nextset')
585
 
 
586
 
    def setinputsizes(self, sizes):
587
 
        """As per DB-API."""
588
 
        self._checkClosed()
589
 
        return # No-op.
590
 
 
591
 
    def setoutputsize(self, size, column=None):
592
 
        """As per DB-API."""
593
 
        self._checkClosed()
594
 
        return # No-op.
595
 
 
596
 
    ## psycopg1 does not support this extension.
597
 
    ##
598
 
    ## def next(self):
599
 
    ##     """As per iterator spec and DB-API optional extension."""
600
 
    ##     row = self.fetchone()
601
 
    ##     if row is None:
602
 
    ##         raise StopIteration
603
 
    ##     else:
604
 
    ##         return row
605
 
 
606
 
    ## def __iter__(self):
607
 
    ##     """As per iterator spec and DB-API optional extension."""
608
 
    ##     return self
609