~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# Copyright 2010 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

# We like global statements!
# pylint: disable-msg=W0602,W0603
__metaclass__ = type

__all__ = [
    'launchpadlib_credentials_for',
    'launchpadlib_for',
    'oauth_access_token_for',
    ]


import shutil
import tempfile

from launchpadlib.credentials import (
    AccessToken,
    AnonymousAccessToken,
    Credentials,
    )
from launchpadlib.launchpad import Launchpad
import transaction
from zope.app.publication.interfaces import IEndRequestEvent
from zope.app.testing import ztapi
from zope.component import getUtility
import zope.testing.cleanup

from lp.registry.interfaces.person import IPersonSet
from lp.services.oauth.interfaces import IOAuthConsumerSet
from lp.services.webapp.adapter import get_request_statements
from lp.services.webapp.interaction import ANONYMOUS
from lp.services.webapp.interfaces import OAuthPermission
from lp.services.webapp.publisher import canonical_url
from lp.testing._login import (
    login,
    logout,
    )


def api_url(obj):
    """Find the web service URL of a data model object.

    This makes it easy to load up the factory object you just created
    in launchpadlib.

    :param: Which web service version to use.

    :return: A relative URL suitable for passing into Launchpad.load().
    """
    return canonical_url(obj, force_local_path=True)


def oauth_access_token_for(consumer_name, person, permission, context=None):
    """Find or create an OAuth access token for the given person.
    :param consumer_name: An OAuth consumer name.
    :param person: A person (or the name of a person) for whom to create
        or find credentials.
    :param permission: An OAuthPermission (or its token) designating
        the level of permission the credentials should have.
    :param context: The OAuth context for the credentials (or a string
        designating same).

    :return: An OAuthAccessToken object.
    """
    if isinstance(person, basestring):
        # Look up a person by name.
        person = getUtility(IPersonSet).getByName(person)
    if isinstance(context, basestring):
        # Turn an OAuth context string into the corresponding object.
        # Avoid an import loop by importing from launchpad.browser here.
        from lp.services.oauth.browser import lookup_oauth_context
        context = lookup_oauth_context(context)
    if isinstance(permission, basestring):
        # Look up a permission by its token string.
        permission = OAuthPermission.items[permission]

    # Find or create the consumer object.
    consumer_set = getUtility(IOAuthConsumerSet)
    consumer = consumer_set.getByKey(consumer_name)
    if consumer is None:
        consumer = consumer_set.new(consumer_name)
    else:
        # We didn't have to create the consumer. Maybe this user
        # already has an access token for this
        # consumer+person+permission?
        existing_token = [token for token in person.oauth_access_tokens
                          if (token.consumer == consumer
                              and token.permission == permission
                              and token.context == context)]
        if len(existing_token) >= 1:
            return existing_token[0]

    # There is no existing access token for this
    # consumer+person+permission+context. Create one and review it.
    request_token = consumer.newRequestToken()
    request_token.review(person, permission, context)
    access_token = request_token.createAccessToken()
    return access_token


def launchpadlib_credentials_for(
    consumer_name, person, permission=OAuthPermission.WRITE_PRIVATE,
    context=None):
    """Create launchpadlib credentials for the given person.

    :param consumer_name: An OAuth consumer name.
    :param person: A person (or the name of a person) for whom to create
        or find credentials.
    :param permission: An OAuthPermission (or its token) designating
        the level of permission the credentials should have.
    :param context: The OAuth context for the credentials.
    :return: A launchpadlib Credentials object.
    """
    # Start an interaction so that oauth_access_token_for will
    # succeed.  oauth_access_token_for may be called in any layer, but
    # launchpadlib_credentials_for is only called in the
    # PageTestLayer, when a Launchpad instance is running for
    # launchpadlib to use.
    login(ANONYMOUS)
    access_token = oauth_access_token_for(
        consumer_name, person, permission, context)
    logout()
    launchpadlib_token = AccessToken(
        access_token.key, access_token.secret)
    return Credentials(consumer_name=consumer_name,
                       access_token=launchpadlib_token)


def _clean_up_cache(cache):
    """Clean up a temporary launchpadlib cache directory."""
    shutil.rmtree(cache, ignore_errors=True)


def launchpadlib_for(
    consumer_name, person=None, permission=OAuthPermission.WRITE_PRIVATE,
    context=None, version="devel", service_root="http://api.launchpad.dev/"):
    """Create a Launchpad object for the given person.

    :param consumer_name: An OAuth consumer name.
    :param person: A person (or the name of a person) for whom to create
        or find credentials.
    :param permission: An OAuthPermission (or its token) designating
        the level of permission the credentials should have.
    :param context: The OAuth context for the credentials.
    :param version: The version of the web service to access.
    :param service_root: The root URL of the web service to access.

    :return: A launchpadlib Launchpad object.
    """
    if person is None:
        token = AnonymousAccessToken()
        credentials = Credentials(consumer_name, access_token=token)
    else:
        credentials = launchpadlib_credentials_for(
            consumer_name, person, permission, context)
    transaction.commit()
    cache = tempfile.mkdtemp(prefix='launchpadlib-cache-')
    zope.testing.cleanup.addCleanUp(_clean_up_cache, (cache,))
    return Launchpad(credentials, None, None, service_root=service_root,
                     version=version, cache=cache)


class QueryCollector:
    """Collect database calls made in web requests.

    These are only retrievable at the end of a request, and for tests it is
    useful to be able to make assertions about the calls made during a
    request: this class provides a tool to gather them in a simple fashion.

    :ivar count: The count of db queries the last web request made.
    :ivar queries: The list of queries made. See
        lp.services.webapp.adapter.get_request_statements for more
        information.
    """

    def __init__(self):
        self._active = False
        self.count = None
        self.queries = None

    def register(self):
        """Start counting queries.

        Be sure to call unregister when finished with the collector.

        After each web request the count and queries attributes are updated.
        """
        ztapi.subscribe((IEndRequestEvent, ), None, self)
        self._active = True

    def __call__(self, event):
        if self._active:
            self.queries = get_request_statements()
            self.count = len(self.queries)

    def unregister(self):
        self._active = False