~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/load_test.py

  • Committer: John Arbash Meinel
  • Date: 2011-03-16 12:39:56 UTC
  • mfrom: (432.2.8 authors-733015)
  • Revision ID: john@arbash-meinel.com-20110316123956-6jherozycdjmt9px
Fix bug #733015. Have a separate Author(s) line from Committer.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Code to do some load testing against a loggerhead instance.
 
18
 
 
19
This is basically meant to take a list of actions to take, and run it against a
 
20
real host, and see how the results respond.::
 
21
 
 
22
    {"parameters": {
 
23
         "base_url": "http://localhost:8080",
 
24
     },
 
25
     "requests": [
 
26
        {"thread": "1", "relpath": "/changes"},
 
27
        {"thread": "1", "relpath": "/changes"},
 
28
        {"thread": "1", "relpath": "/changes"},
 
29
        {"thread": "1", "relpath": "/changes"}
 
30
     ],
 
31
    }
 
32
 
 
33
All threads have a Queue length of 1. When a third request for a given thread
 
34
is seen, no more requests are queued until that thread finishes its current
 
35
job. So this results in all requests being issued sequentially::
 
36
 
 
37
        {"thread": "1", "relpath": "/changes"},
 
38
        {"thread": "1", "relpath": "/changes"},
 
39
        {"thread": "1", "relpath": "/changes"},
 
40
        {"thread": "1", "relpath": "/changes"}
 
41
 
 
42
While this would cause all requests to be sent in parallel:
 
43
 
 
44
        {"thread": "1", "relpath": "/changes"},
 
45
        {"thread": "2", "relpath": "/changes"},
 
46
        {"thread": "3", "relpath": "/changes"},
 
47
        {"thread": "4", "relpath": "/changes"}
 
48
 
 
49
This should keep 2 threads pipelined with activity, as long as they finish in
 
50
approximately the same speed. We'll start the first thread running, and the
 
51
second thread, and queue up both with a second request once the first finishes.
 
52
When we get to the third request for thread "1", we block on queuing up more
 
53
work until the first thread 1 request has finished.
 
54
        {"thread": "1", "relpath": "/changes"},
 
55
        {"thread": "2", "relpath": "/changes"},
 
56
        {"thread": "1", "relpath": "/changes"},
 
57
        {"thread": "2", "relpath": "/changes"},
 
58
        {"thread": "1", "relpath": "/changes"},
 
59
        {"thread": "2", "relpath": "/changes"}
 
60
 
 
61
There is not currently a way to say "run all these requests keeping exactly 2
 
62
threads active". Though if you know the load pattern, you could approximate
 
63
this.
 
64
"""
 
65
 
 
66
import threading
 
67
import time
 
68
import Queue
 
69
 
 
70
import simplejson
 
71
 
 
72
from bzrlib import (
 
73
    errors,
 
74
    transport,
 
75
    urlutils,
 
76
    )
 
77
 
 
78
# This code will be doing multi-threaded requests against bzrlib.transport
 
79
# code. We want to make sure to load everything ahead of time, so we don't get
 
80
# lazy-import failures
 
81
_ = transport.get_transport('http://example.com')
 
82
 
 
83
 
 
84
class RequestDescription(object):
 
85
    """Describes info about a request."""
 
86
 
 
87
    def __init__(self, descrip_dict):
 
88
        self.thread = descrip_dict.get('thread', '1')
 
89
        self.relpath = descrip_dict['relpath']
 
90
 
 
91
 
 
92
class RequestWorker(object):
 
93
    """Process requests in a worker thread."""
 
94
 
 
95
    _timer = time.time
 
96
 
 
97
    def __init__(self, identifier, blocking_time=1.0, _queue_size=1):
 
98
        self.identifier = identifier
 
99
        self.queue = Queue.Queue(_queue_size)
 
100
        self.start_time = self.end_time = None
 
101
        self.stats = []
 
102
        self.blocking_time = blocking_time
 
103
 
 
104
    def step_next(self):
 
105
        url = self.queue.get(True, self.blocking_time)
 
106
        if url == '<noop>':
 
107
            # This is usually an indicator that we want to stop, so just skip
 
108
            # this one.
 
109
            self.queue.task_done()
 
110
            return
 
111
        self.start_time = self._timer()
 
112
        success = self.process(url)
 
113
        self.end_time = self._timer()
 
114
        self.update_stats(url, success)
 
115
        self.queue.task_done()
 
116
 
 
117
    def run(self, stop_event):
 
118
        while not stop_event.isSet():
 
119
            try:
 
120
                self.step_next()
 
121
            except Queue.Empty:
 
122
                pass
 
123
 
 
124
    def process(self, url):
 
125
        base, path = urlutils.split(url)
 
126
        t = transport.get_transport(base)
 
127
        try:
 
128
            # TODO: We should probably look into using some part of
 
129
            #       blocking_timeout to decide when to stop trying to read
 
130
            #       content
 
131
            content = t.get_bytes(path)
 
132
        except (errors.TransportError, errors.NoSuchFile):
 
133
            return False
 
134
        return True
 
135
 
 
136
    def update_stats(self, url, success):
 
137
        self.stats.append((url, success, self.end_time - self.start_time))
 
138
 
 
139
 
 
140
class ActionScript(object):
 
141
    """This tracks the actions that we want to perform."""
 
142
 
 
143
    _worker_class = RequestWorker
 
144
    _default_base_url = 'http://localhost:8080'
 
145
    _default_blocking_timeout = 60.0
 
146
 
 
147
    def __init__(self):
 
148
        self.base_url = self._default_base_url
 
149
        self.blocking_timeout = self._default_blocking_timeout
 
150
        self._requests = []
 
151
        self._threads = {}
 
152
        self.stop_event = threading.Event()
 
153
 
 
154
    @classmethod
 
155
    def parse(cls, content):
 
156
        script = cls()
 
157
        json_dict = simplejson.loads(content)
 
158
        if 'parameters' not in json_dict:
 
159
            raise ValueError('Missing "parameters" section')
 
160
        if 'requests' not in json_dict:
 
161
            raise ValueError('Missing "requests" section')
 
162
        param_dict = json_dict['parameters']
 
163
        request_list = json_dict['requests']
 
164
        base_url = param_dict.get('base_url', None)
 
165
        if base_url is not None:
 
166
            script.base_url = base_url
 
167
        blocking_timeout = param_dict.get('blocking_timeout', None)
 
168
        if blocking_timeout is not None:
 
169
            script.blocking_timeout = blocking_timeout
 
170
        for request_dict in request_list:
 
171
            script.add_request(request_dict)
 
172
        return script
 
173
 
 
174
    def add_request(self, request_dict):
 
175
        request = RequestDescription(request_dict)
 
176
        self._requests.append(request)
 
177
 
 
178
    def _get_worker(self, thread_id):
 
179
        if thread_id in self._threads:
 
180
            return self._threads[thread_id][0]
 
181
        handler = self._worker_class(thread_id,
 
182
                                     blocking_time=self.blocking_timeout/10.0)
 
183
 
 
184
        t = threading.Thread(target=handler.run, args=(self.stop_event,),
 
185
                             name='Thread-%s' % (thread_id,))
 
186
        self._threads[thread_id] = (handler, t)
 
187
        t.start()
 
188
        return handler
 
189
 
 
190
    def finish_queues(self):
 
191
        """Wait for all queues of all children to finish."""
 
192
        for h, t in self._threads.itervalues():
 
193
            h.queue.join()
 
194
 
 
195
    def stop_and_join(self):
 
196
        """Stop all running workers, and return.
 
197
 
 
198
        This will stop even if workers still have work items.
 
199
        """
 
200
        self.stop_event.set()
 
201
        for h, t in self._threads.itervalues():
 
202
            # Signal the queue that it should stop blocking, we don't have to
 
203
            # wait for the queue to empty, because we may see stop_event before
 
204
            # we see the <noop>
 
205
            h.queue.put('<noop>')
 
206
            # And join the controlling thread
 
207
            for i in range(10):
 
208
                t.join(self.blocking_timeout / 10.0)
 
209
                if not t.isAlive():
 
210
                    break
 
211
 
 
212
    def _full_url(self, relpath):
 
213
        return self.base_url + relpath
 
214
 
 
215
    def run(self):
 
216
        self.stop_event.clear()
 
217
        for request in self._requests:
 
218
            full_url = self._full_url(request.relpath)
 
219
            worker = self._get_worker(request.thread)
 
220
            worker.queue.put(full_url, True, self.blocking_timeout)
 
221
        self.finish_queues()
 
222
        self.stop_and_join()
 
223
 
 
224
 
 
225
def run_script(filename):
 
226
    with open(filename, 'rb') as f:
 
227
        content = f.read()
 
228
    script = ActionScript.parse(content)
 
229
    script.run()
 
230
    return script