1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
#!/usr/bin/python
#
# Copyright 2010 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
import re
import sys
from bzrlib.branch import Branch
from bzrlib.config import LocationConfig
from bzrlib.errors import NoSuchRevision
from bzrlib.transport import get_transport
class UsageError(Exception):
"""Raised when the user makes a dumb error."""
def get_staging_revision():
"""Get the revision of db-stable deployed on staging.
:return: The staging revno as an int. Corresponds to a revision of
lp:launchpad/db-stable.
"""
t = get_transport('https://staging.launchpad.net/')
last_line = t.get_bytes('successful-updates.txt').splitlines()[-1]
return int(last_line.split()[-1])
def get_edge_revision():
"""Get the revision of stable deployed on edge.
:return: The edge revno as an int. Corresponds to a revision of
lp:launchpad/stable.
"""
t = get_transport('https://edge.launchpad.net/')
html = t.get_bytes('index.html')
revision_re = re.compile(r' r(\d+)$')
for line in html.splitlines():
matches = revision_re.search(line)
if matches:
return int(matches.group(1))
raise ValueError("Could not find revision number on edge home page")
def is_present(local_branch, deployed_location, deployed_revno):
local_mirror = LocationConfig(deployed_location).get_user_option(
'local_location')
if local_mirror is None:
print (
'Please configure a local_location for %s in locations.conf '
'to improve performance.' % deployed_location)
deployed_branch = Branch.open(deployed_location)
else:
deployed_branch = Branch.open(local_mirror)
deployed_branch.lock_write()
try:
try:
deployed_rev_id = deployed_branch.get_rev_id(deployed_revno)
except NoSuchRevision:
if local_mirror is None:
raise
else:
remote = Branch.open(deployed_location)
remote.lock_read()
try:
deployed_rev_id = remote.get_rev_id(deployed_revno)
print "Mirror %s is out of date." % deployed_branch.base
if not deployed_branch.repository.has_revision(
deployed_rev_id):
deployed_branch.repository.fetch(
remote.repository, deployed_rev_id)
assert deployed_branch.repository.has_revision(
deployed_rev_id)
finally:
remote.unlock()
graph = deployed_branch.repository.get_graph(local_branch.repository)
return graph.is_ancestor(local_branch.last_revision(), deployed_rev_id)
finally:
deployed_branch.unlock()
stable = 'bzr+ssh://bazaar.launchpad.net/~launchpad-pqm/launchpad/stable'
dbstable = 'bzr+ssh://bazaar.launchpad.net/~launchpad-pqm/launchpad/db-stable'
def main(argv):
if len(sys.argv) > 1:
location = sys.argv[1]
else:
location = '.'
b = Branch.open_containing(location)[0]
b.lock_read()
try:
print 'Branch: %s' % b.base
print 'Deployed on edge: %s' % is_present(
b, stable, get_edge_revision())
print 'Deployed on staging: %s' % is_present(
b, dbstable, get_staging_revision())
finally:
b.unlock()
if __name__ == '__main__':
try:
sys.exit(main(sys.argv[1:]))
except UsageError, e:
print 'ERROR: %s' % e
sys.exit(1)
|