~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/python
#
# Copyright 2010 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

import re
import sys

from bzrlib.branch import Branch
from bzrlib.config import LocationConfig
from bzrlib.errors import NoSuchRevision
from bzrlib.transport import get_transport


class UsageError(Exception):
    """Raised when the user makes a dumb error."""


def get_staging_revision():
    """Get the revision of db-stable deployed on staging.

    :return: The staging revno as an int. Corresponds to a revision of
        lp:launchpad/db-stable.
    """
    t = get_transport('https://staging.launchpad.net/')
    last_line = t.get_bytes('successful-updates.txt').splitlines()[-1]
    return int(last_line.split()[-1])


def get_edge_revision():
    """Get the revision of stable deployed on edge.

    :return: The edge revno as an int. Corresponds to a revision of
        lp:launchpad/stable.
    """
    t = get_transport('https://edge.launchpad.net/')
    html = t.get_bytes('index.html')
    revision_re = re.compile(r' r(\d+)$')
    for line in html.splitlines():
        matches = revision_re.search(line)
        if matches:
            return int(matches.group(1))
    raise ValueError("Could not find revision number on edge home page")


def is_present(local_branch, deployed_location, deployed_revno):
    local_mirror = LocationConfig(deployed_location).get_user_option(
        'local_location')
    if local_mirror is None:
        print (
            'Please configure a local_location for %s in locations.conf '
            'to improve performance.' % deployed_location)
        deployed_branch = Branch.open(deployed_location)
    else:
        deployed_branch = Branch.open(local_mirror)
    deployed_branch.lock_write()
    try:
        try:
            deployed_rev_id = deployed_branch.get_rev_id(deployed_revno)
        except NoSuchRevision:
            if local_mirror is None:
                raise
            else:
                remote = Branch.open(deployed_location)
                remote.lock_read()
                try:
                    deployed_rev_id = remote.get_rev_id(deployed_revno)
                    print "Mirror %s is out of date." % deployed_branch.base
                    if not deployed_branch.repository.has_revision(
                        deployed_rev_id):
                        deployed_branch.repository.fetch(
                            remote.repository, deployed_rev_id)
                    assert deployed_branch.repository.has_revision(
                        deployed_rev_id)
                finally:
                    remote.unlock()
        graph = deployed_branch.repository.get_graph(local_branch.repository)
        return graph.is_ancestor(local_branch.last_revision(), deployed_rev_id)
    finally:
        deployed_branch.unlock()



stable = 'bzr+ssh://bazaar.launchpad.net/~launchpad-pqm/launchpad/stable'
dbstable = 'bzr+ssh://bazaar.launchpad.net/~launchpad-pqm/launchpad/db-stable'
def main(argv):
    if len(sys.argv) > 1:
        location = sys.argv[1]
    else:
        location = '.'
    b = Branch.open_containing(location)[0]
    b.lock_read()
    try:
        print 'Branch: %s' % b.base
        print 'Deployed on edge: %s' % is_present(
            b, stable, get_edge_revision())
        print 'Deployed on staging: %s' % is_present(
            b, dbstable, get_staging_revision())
    finally:
        b.unlock()


if __name__ == '__main__':
    try:
        sys.exit(main(sys.argv[1:]))
    except UsageError, e:
        print 'ERROR: %s' % e
        sys.exit(1)