1
# Copyright 2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Code to do some load testing against a loggerhead instance.
19
This is basically meant to take a list of actions to take, and run it against a
20
real host, and see how the results respond.::
23
"base_url": "http://localhost:8080",
26
{"thread": "1", "relpath": "/changes"},
27
{"thread": "1", "relpath": "/changes"},
28
{"thread": "1", "relpath": "/changes"},
29
{"thread": "1", "relpath": "/changes"}
33
All threads have a Queue length of 1. When a third request for a given thread
34
is seen, no more requests are queued until that thread finishes its current
35
job. So this results in all requests being issued sequentially::
37
{"thread": "1", "relpath": "/changes"},
38
{"thread": "1", "relpath": "/changes"},
39
{"thread": "1", "relpath": "/changes"},
40
{"thread": "1", "relpath": "/changes"}
42
While this would cause all requests to be sent in parallel:
44
{"thread": "1", "relpath": "/changes"},
45
{"thread": "2", "relpath": "/changes"},
46
{"thread": "3", "relpath": "/changes"},
47
{"thread": "4", "relpath": "/changes"}
49
This should keep 2 threads pipelined with activity, as long as they finish in
50
approximately the same speed. We'll start the first thread running, and the
51
second thread, and queue up both with a second request once the first finishes.
52
When we get to the third request for thread "1", we block on queuing up more
53
work until the first thread 1 request has finished.
54
{"thread": "1", "relpath": "/changes"},
55
{"thread": "2", "relpath": "/changes"},
56
{"thread": "1", "relpath": "/changes"},
57
{"thread": "2", "relpath": "/changes"},
58
{"thread": "1", "relpath": "/changes"},
59
{"thread": "2", "relpath": "/changes"}
61
There is not currently a way to say "run all these requests keeping exactly 2
62
threads active". Though if you know the load pattern, you could approximate
78
# This code will be doing multi-threaded requests against bzrlib.transport
79
# code. We want to make sure to load everything ahead of time, so we don't get
80
# lazy-import failures
81
_ = transport.get_transport('http://example.com')
84
class RequestDescription(object):
85
"""Describes info about a request."""
87
def __init__(self, descrip_dict):
88
self.thread = descrip_dict.get('thread', '1')
89
self.relpath = descrip_dict['relpath']
92
class RequestWorker(object):
93
"""Process requests in a worker thread."""
97
def __init__(self, identifier, blocking_time=1.0, _queue_size=1):
98
self.identifier = identifier
99
self.queue = Queue.Queue(_queue_size)
100
self.start_time = self.end_time = None
102
self.blocking_time = blocking_time
105
url = self.queue.get(True, self.blocking_time)
107
# This is usually an indicator that we want to stop, so just skip
109
self.queue.task_done()
111
self.start_time = self._timer()
112
success = self.process(url)
113
self.end_time = self._timer()
114
self.update_stats(url, success)
115
self.queue.task_done()
117
def run(self, stop_event):
118
while not stop_event.isSet():
124
def process(self, url):
125
base, path = urlutils.split(url)
126
t = transport.get_transport(base)
128
# TODO: We should probably look into using some part of
129
# blocking_timeout to decide when to stop trying to read
131
content = t.get_bytes(path)
132
except (errors.TransportError, errors.NoSuchFile):
136
def update_stats(self, url, success):
137
self.stats.append((url, success, self.end_time - self.start_time))
140
class ActionScript(object):
141
"""This tracks the actions that we want to perform."""
143
_worker_class = RequestWorker
144
_default_base_url = 'http://localhost:8080'
145
_default_blocking_timeout = 60.0
148
self.base_url = self._default_base_url
149
self.blocking_timeout = self._default_blocking_timeout
152
self.stop_event = threading.Event()
155
def parse(cls, content):
157
json_dict = simplejson.loads(content)
158
if 'parameters' not in json_dict:
159
raise ValueError('Missing "parameters" section')
160
if 'requests' not in json_dict:
161
raise ValueError('Missing "requests" section')
162
param_dict = json_dict['parameters']
163
request_list = json_dict['requests']
164
base_url = param_dict.get('base_url', None)
165
if base_url is not None:
166
script.base_url = base_url
167
blocking_timeout = param_dict.get('blocking_timeout', None)
168
if blocking_timeout is not None:
169
script.blocking_timeout = blocking_timeout
170
for request_dict in request_list:
171
script.add_request(request_dict)
174
def add_request(self, request_dict):
175
request = RequestDescription(request_dict)
176
self._requests.append(request)
178
def _get_worker(self, thread_id):
179
if thread_id in self._threads:
180
return self._threads[thread_id][0]
181
handler = self._worker_class(thread_id,
182
blocking_time=self.blocking_timeout/10.0)
184
t = threading.Thread(target=handler.run, args=(self.stop_event,),
185
name='Thread-%s' % (thread_id,))
186
self._threads[thread_id] = (handler, t)
190
def finish_queues(self):
191
"""Wait for all queues of all children to finish."""
192
for h, t in self._threads.itervalues():
195
def stop_and_join(self):
196
"""Stop all running workers, and return.
198
This will stop even if workers still have work items.
200
self.stop_event.set()
201
for h, t in self._threads.itervalues():
202
# Signal the queue that it should stop blocking, we don't have to
203
# wait for the queue to empty, because we may see stop_event before
205
h.queue.put('<noop>')
206
# And join the controlling thread
208
t.join(self.blocking_timeout / 10.0)
212
def _full_url(self, relpath):
213
return self.base_url + relpath
216
self.stop_event.clear()
217
for request in self._requests:
218
full_url = self._full_url(request.relpath)
219
worker = self._get_worker(request.thread)
220
worker.queue.put(full_url, True, self.blocking_timeout)
225
def run_script(filename):
226
with open(filename, 'rb') as f:
228
script = ActionScript.parse(content)