1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
# Copyright 2010 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Helpers for command line tools."""
__metaclass__ = type
__all__ = ["LPOptionParser", "TransactionFreeOperation", ]
import contextlib
from copy import copy
from datetime import datetime
from optparse import (
Option,
OptionParser,
OptionValueError,
)
import transaction
from lp.services.scripts.logger import logger_options
def _check_datetime(option, opt, value):
"Type checker for optparse datetime option type."
# We support 5 valid ISO8601 formats.
formats = [
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M',
'%Y-%m-%d',
]
for format in formats:
try:
return datetime.strptime(value, format)
except ValueError:
pass
raise OptionValueError(
"option %s: invalid datetime value: %r" % (opt, value))
class LPOption(Option):
"""Extended optparse Option class.
Adds a 'datetime' option type.
"""
TYPES = Option.TYPES + ("datetime", datetime)
TYPE_CHECKER = copy(Option.TYPE_CHECKER)
TYPE_CHECKER["datetime"] = _check_datetime
TYPE_CHECKER[datetime] = _check_datetime
class LPOptionParser(OptionParser):
"""Extended optparse OptionParser.
Adds a 'datetime' option type.
Automatically adds our standard --verbose, --quiet options that
tie into our logging system.
"""
def __init__(self, *args, **kw):
kw.setdefault('option_class', LPOption)
OptionParser.__init__(self, *args, **kw)
logger_options(self)
class TransactionFreeOperation:
"""Ensure that an operation has no active transactions.
This helps ensure that long-running operations do not hold a database
transaction. Long-running operations that hold a database transaction
may have their database connection killed, and hold locks that interfere
with other updates.
"""
count = 0
@staticmethod
def any_active_transactions():
return transaction.manager._txns != {}
@classmethod
def __enter__(cls):
if cls.any_active_transactions():
raise AssertionError('Transaction open before operation!')
@classmethod
def __exit__(cls, exc_type, exc_value, traceback):
if cls.any_active_transactions():
raise AssertionError('Operation opened transaction!')
cls.count += 1
@classmethod
@contextlib.contextmanager
def require(cls):
"""Require that TransactionFreeOperation is used at least once."""
old_count = cls.count
try:
yield
finally:
if old_count >= cls.count:
raise AssertionError('TransactionFreeOperation was not used.')
|