~loggerhead-team/loggerhead/trunk-rich

« back to all changes in this revision

Viewing changes to loggerhead/load_test.py

  • Committer: Martin Albisetti
  • Date: 2008-08-06 19:38:14 UTC
  • Revision ID: argentina@gmail.com-20080806193814-7h2hgep2fyktlkql
Further cleaning

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2011 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Code to do some load testing against a loggerhead instance.
18
 
 
19
 
This is basically meant to take a list of actions to take, and run it against a
20
 
real host, and see how the results respond.::
21
 
 
22
 
    {"parameters": {
23
 
         "base_url": "http://localhost:8080",
24
 
     },
25
 
     "requests": [
26
 
        {"thread": "1", "relpath": "/changes"},
27
 
        {"thread": "1", "relpath": "/changes"},
28
 
        {"thread": "1", "relpath": "/changes"},
29
 
        {"thread": "1", "relpath": "/changes"}
30
 
     ],
31
 
    }
32
 
 
33
 
All threads have a Queue length of 1. When a third request for a given thread
34
 
is seen, no more requests are queued until that thread finishes its current
35
 
job. So this results in all requests being issued sequentially::
36
 
 
37
 
        {"thread": "1", "relpath": "/changes"},
38
 
        {"thread": "1", "relpath": "/changes"},
39
 
        {"thread": "1", "relpath": "/changes"},
40
 
        {"thread": "1", "relpath": "/changes"}
41
 
 
42
 
While this would cause all requests to be sent in parallel:
43
 
 
44
 
        {"thread": "1", "relpath": "/changes"},
45
 
        {"thread": "2", "relpath": "/changes"},
46
 
        {"thread": "3", "relpath": "/changes"},
47
 
        {"thread": "4", "relpath": "/changes"}
48
 
 
49
 
This should keep 2 threads pipelined with activity, as long as they finish in
50
 
approximately the same speed. We'll start the first thread running, and the
51
 
second thread, and queue up both with a second request once the first finishes.
52
 
When we get to the third request for thread "1", we block on queuing up more
53
 
work until the first thread 1 request has finished.
54
 
        {"thread": "1", "relpath": "/changes"},
55
 
        {"thread": "2", "relpath": "/changes"},
56
 
        {"thread": "1", "relpath": "/changes"},
57
 
        {"thread": "2", "relpath": "/changes"},
58
 
        {"thread": "1", "relpath": "/changes"},
59
 
        {"thread": "2", "relpath": "/changes"}
60
 
 
61
 
There is not currently a way to say "run all these requests keeping exactly 2
62
 
threads active". Though if you know the load pattern, you could approximate
63
 
this.
64
 
"""
65
 
 
66
 
import threading
67
 
import time
68
 
import Queue
69
 
 
70
 
import simplejson
71
 
 
72
 
from bzrlib import (
73
 
    errors,
74
 
    transport,
75
 
    urlutils,
76
 
    )
77
 
 
78
 
# This code will be doing multi-threaded requests against bzrlib.transport
79
 
# code. We want to make sure to load everything ahead of time, so we don't get
80
 
# lazy-import failures
81
 
_ = transport.get_transport('http://example.com')
82
 
 
83
 
 
84
 
class RequestDescription(object):
85
 
    """Describes info about a request."""
86
 
 
87
 
    def __init__(self, descrip_dict):
88
 
        self.thread = descrip_dict.get('thread', '1')
89
 
        self.relpath = descrip_dict['relpath']
90
 
 
91
 
 
92
 
class RequestWorker(object):
93
 
    """Process requests in a worker thread."""
94
 
 
95
 
    _timer = time.time
96
 
 
97
 
    def __init__(self, identifier, blocking_time=1.0, _queue_size=1):
98
 
        self.identifier = identifier
99
 
        self.queue = Queue.Queue(_queue_size)
100
 
        self.start_time = self.end_time = None
101
 
        self.stats = []
102
 
        self.blocking_time = blocking_time
103
 
 
104
 
    def step_next(self):
105
 
        url = self.queue.get(True, self.blocking_time)
106
 
        if url == '<noop>':
107
 
            # This is usually an indicator that we want to stop, so just skip
108
 
            # this one.
109
 
            self.queue.task_done()
110
 
            return
111
 
        self.start_time = self._timer()
112
 
        success = self.process(url)
113
 
        self.end_time = self._timer()
114
 
        self.update_stats(url, success)
115
 
        self.queue.task_done()
116
 
 
117
 
    def run(self, stop_event):
118
 
        while not stop_event.isSet():
119
 
            try:
120
 
                self.step_next()
121
 
            except Queue.Empty:
122
 
                pass
123
 
 
124
 
    def process(self, url):
125
 
        base, path = urlutils.split(url)
126
 
        t = transport.get_transport(base)
127
 
        try:
128
 
            # TODO: We should probably look into using some part of
129
 
            #       blocking_timeout to decide when to stop trying to read
130
 
            #       content
131
 
            content = t.get_bytes(path)
132
 
        except (errors.TransportError, errors.NoSuchFile):
133
 
            return False
134
 
        return True
135
 
 
136
 
    def update_stats(self, url, success):
137
 
        self.stats.append((url, success, self.end_time - self.start_time))
138
 
 
139
 
 
140
 
class ActionScript(object):
141
 
    """This tracks the actions that we want to perform."""
142
 
 
143
 
    _worker_class = RequestWorker
144
 
    _default_base_url = 'http://localhost:8080'
145
 
    _default_blocking_timeout = 60.0
146
 
 
147
 
    def __init__(self):
148
 
        self.base_url = self._default_base_url
149
 
        self.blocking_timeout = self._default_blocking_timeout
150
 
        self._requests = []
151
 
        self._threads = {}
152
 
        self.stop_event = threading.Event()
153
 
 
154
 
    @classmethod
155
 
    def parse(cls, content):
156
 
        script = cls()
157
 
        json_dict = simplejson.loads(content)
158
 
        if 'parameters' not in json_dict:
159
 
            raise ValueError('Missing "parameters" section')
160
 
        if 'requests' not in json_dict:
161
 
            raise ValueError('Missing "requests" section')
162
 
        param_dict = json_dict['parameters']
163
 
        request_list = json_dict['requests']
164
 
        base_url = param_dict.get('base_url', None)
165
 
        if base_url is not None:
166
 
            script.base_url = base_url
167
 
        blocking_timeout = param_dict.get('blocking_timeout', None)
168
 
        if blocking_timeout is not None:
169
 
            script.blocking_timeout = blocking_timeout
170
 
        for request_dict in request_list:
171
 
            script.add_request(request_dict)
172
 
        return script
173
 
 
174
 
    def add_request(self, request_dict):
175
 
        request = RequestDescription(request_dict)
176
 
        self._requests.append(request)
177
 
 
178
 
    def _get_worker(self, thread_id):
179
 
        if thread_id in self._threads:
180
 
            return self._threads[thread_id][0]
181
 
        handler = self._worker_class(thread_id,
182
 
                                     blocking_time=self.blocking_timeout/10.0)
183
 
 
184
 
        t = threading.Thread(target=handler.run, args=(self.stop_event,),
185
 
                             name='Thread-%s' % (thread_id,))
186
 
        self._threads[thread_id] = (handler, t)
187
 
        t.start()
188
 
        return handler
189
 
 
190
 
    def finish_queues(self):
191
 
        """Wait for all queues of all children to finish."""
192
 
        for h, t in self._threads.itervalues():
193
 
            h.queue.join()
194
 
 
195
 
    def stop_and_join(self):
196
 
        """Stop all running workers, and return.
197
 
 
198
 
        This will stop even if workers still have work items.
199
 
        """
200
 
        self.stop_event.set()
201
 
        for h, t in self._threads.itervalues():
202
 
            # Signal the queue that it should stop blocking, we don't have to
203
 
            # wait for the queue to empty, because we may see stop_event before
204
 
            # we see the <noop>
205
 
            h.queue.put('<noop>')
206
 
            # And join the controlling thread
207
 
            for i in range(10):
208
 
                t.join(self.blocking_timeout / 10.0)
209
 
                if not t.isAlive():
210
 
                    break
211
 
 
212
 
    def _full_url(self, relpath):
213
 
        return self.base_url + relpath
214
 
 
215
 
    def run(self):
216
 
        self.stop_event.clear()
217
 
        for request in self._requests:
218
 
            full_url = self._full_url(request.relpath)
219
 
            worker = self._get_worker(request.thread)
220
 
            worker.queue.put(full_url, True, self.blocking_timeout)
221
 
        self.finish_queues()
222
 
        self.stop_and_join()
223
 
 
224
 
 
225
 
def run_script(filename):
226
 
    with open(filename, 'rb') as f:
227
 
        content = f.read()
228
 
    script = ActionScript.parse(content)
229
 
    script.run()
230
 
    return script