~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# Copyright 2010-2012 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Testing registry-related xmlrpc calls."""

__metaclass__ = type

import xmlrpclib

from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy

from lp.registry.interfaces.person import (
    IPersonSet,
    ISoftwareCenterAgentAPI,
    ISoftwareCenterAgentApplication,
    PersonCreationRationale,
    )
from lp.registry.xmlrpc.softwarecenteragent import SoftwareCenterAgentAPI
from lp.services.identity.interfaces.account import AccountStatus
from lp.services.webapp.servers import LaunchpadTestRequest
from lp.testing import TestCaseWithFactory
from lp.testing.layers import LaunchpadFunctionalLayer
from lp.testing.xmlrpc import XMLRPCTestTransport
from lp.xmlrpc.interfaces import IPrivateApplication


class TestSoftwareCenterAgentAPI(TestCaseWithFactory):

    layer = LaunchpadFunctionalLayer

    def setUp(self):
        super(TestSoftwareCenterAgentAPI, self).setUp()
        self.private_root = getUtility(IPrivateApplication)
        self.sca_api = SoftwareCenterAgentAPI(
            context=self.private_root.softwarecenteragent,
            request=LaunchpadTestRequest())

    def test_provides_interface(self):
        # The view interface is provided.
        self.assertProvides(self.sca_api, ISoftwareCenterAgentAPI)

    def test_getOrCreateSoftwareCenterCustomer(self):
        # The method returns the username of the person, and sets the
        # correct creation rational/comment.
        user_name = self.sca_api.getOrCreateSoftwareCenterCustomer(
            u'openid-ident', 'alice@b.com', 'Joe Blogs')

        self.assertEqual('alice', user_name)
        person = getUtility(IPersonSet).getByName(user_name)
        self.assertEqual(
            'openid-ident', removeSecurityProxy(
                person.account).openid_identifiers.any().identifier)
        self.assertEqual(
            PersonCreationRationale.SOFTWARE_CENTER_PURCHASE,
            person.creation_rationale)
        self.assertEqual(
            "when purchasing an application via Software Center.",
            person.creation_comment)


class TestSoftwareCenterAgentApplication(TestCaseWithFactory):

    layer = LaunchpadFunctionalLayer

    def setUp(self):
        super(TestSoftwareCenterAgentApplication, self).setUp()
        self.private_root = getUtility(IPrivateApplication)
        self.rpc_proxy = xmlrpclib.ServerProxy(
            'http://xmlrpc-private.launchpad.dev:8087/softwarecenteragent',
            transport=XMLRPCTestTransport())

    def test_provides_interface(self):
        # The application is provided.
        self.assertProvides(
            self.private_root.softwarecenteragent,
            ISoftwareCenterAgentApplication)

    def test_getOrCreateSoftwareCenterCustomer_xmlrpc(self):
        # The method can be called via xmlrpc
        user_name = self.rpc_proxy.getOrCreateSoftwareCenterCustomer(
            u'openid-ident', 'a@b.com', 'Joe Blogs')
        person = getUtility(IPersonSet).getByName(user_name)
        self.assertEqual(
            u'openid-ident',
            removeSecurityProxy(
                person.account).openid_identifiers.any().identifier)

    def test_getOrCreateSoftwareCenterCustomer_xmlrpc_error(self):
        # A suspended account results in an appropriate xmlrpc fault.
        suspended_person = self.factory.makePerson(
            displayname='Joe Blogs', email='a@b.com',
            account_status=AccountStatus.SUSPENDED)
        openid_identifier = removeSecurityProxy(
            suspended_person.account).openid_identifiers.any().identifier

        # assertRaises doesn't let us check the type of Fault.
        fault_raised = False
        try:
            self.rpc_proxy.getOrCreateSoftwareCenterCustomer(
                openid_identifier, 'a@b.com', 'Joe Blogs')
        except xmlrpclib.Fault, e:
            fault_raised = True
            self.assertEqual(370, e.faultCode)
            self.assertIn(openid_identifier, e.faultString)

        self.assertTrue(fault_raised)

    def test_not_available_on_public_api(self):
        # The person set api is not available on the public xmlrpc
        # service.
        public_rpc_proxy = xmlrpclib.ServerProxy(
            'http://test@canonical.com:test@'
            'xmlrpc.launchpad.dev/softwarecenteragent',
            transport=XMLRPCTestTransport())

        # assertRaises doesn't let us check the type of Fault.
        protocol_error_raised = False
        try:
            public_rpc_proxy.getOrCreateSoftwareCenterCustomer(
                'openid-ident', 'a@b.com', 'Joe Blogs')
        except xmlrpclib.ProtocolError, e:
            protocol_error_raised = True
            self.assertEqual(404, e.errcode)

        self.assertTrue(protocol_error_raised)