1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
# Copyright 2006 Canonical Ltd. All rights reserved.
"""Alter the standard functional testing environment for Launchpad."""
from cStringIO import StringIO
import httplib
import xmlrpclib
from zope.app.testing.functional import HTTPCaller
from zope.security.management import endInteraction, queryInteraction
from canonical.launchpad.webapp.interaction import (
get_current_principal, setupInteraction)
class HTTPCallerHTTPConnection(httplib.HTTPConnection):
"""A HTTPConnection which talks to HTTPCaller instead of a real server.
Only the methods called by xmlrpclib are overridden.
"""
_data_to_send = ''
_response = None
def __init__(self, host):
httplib.HTTPConnection.__init__(self, host)
self.caller = HTTPCaller()
def connect(self):
"""No need to connect."""
pass
def send(self, data):
"""Send the request to HTTPCaller."""
# We don't send it to HTTPCaller yet, we store the data and sends
# everything at once when the client requests a response.
self._data_to_send += data
def getresponse(self):
"""Get the response."""
current_principal = None
# End and save the current interaction, since HTTPCaller creates
# its own interaction.
if queryInteraction():
current_principal = get_current_principal()
endInteraction()
if self._response is None:
self._response = self.caller(self._data_to_send)
# Restore the interaction to what it was before.
setupInteraction(current_principal)
return self._response
def getreply(self):
"""Return a tuple of status code, reason string, and headers."""
response = self.getresponse()
return (
response.getStatus(),
response.getStatusString(),
response.getHeaders())
def getfile(self):
"""Get the response body as a file like object."""
response = self.getresponse()
return StringIO(response.consumeBody())
class XMLRPCTestTransport(xmlrpclib.Transport):
"""An XMLRPC Transport which sends the requests to HTTPCaller."""
def make_connection(self, host):
"""Return our custom HTTPCaller HTTPConnection."""
host, extra_headers, x509 = self.get_host_info(host)
return HTTPCallerHTTPConnection(host)
|