9590.1.135
by Michael Hudson
add files from launchpad-loggerhead tree to launchpad tree |
1 |
# Copyright 2009 Canonical Ltd. This software is licensed under the
|
2 |
# GNU Affero General Public License version 3 (see the file LICENSE).
|
|
3 |
||
4 |
import thread |
|
5 |
import time |
|
6 |
||
7 |
from paste.request import construct_url |
|
8 |
||
9 |
||
10 |
def tabulate(cells): |
|
11 |
"""Format a list of lists of strings in a table.
|
|
12 |
||
13 |
The 'cells' are centered.
|
|
14 |
||
15 |
>>> print ''.join(tabulate(
|
|
16 |
... [['title 1', 'title 2'],
|
|
17 |
... ['short', 'rather longer']]))
|
|
18 |
title 1 title 2
|
|
19 |
short rather longer
|
|
20 |
"""
|
|
21 |
widths = {} |
|
22 |
for row in cells: |
|
23 |
for col_index, cell in enumerate(row): |
|
24 |
widths[col_index] = max(len(cell), widths.get(col_index, 0)) |
|
25 |
result = [] |
|
26 |
for row in cells: |
|
27 |
result_row = '' |
|
28 |
for col_index, cell in enumerate(row): |
|
29 |
result_row += cell.center(widths[col_index] + 2) |
|
30 |
result.append(result_row.rstrip() + '\n') |
|
31 |
return result |
|
32 |
||
33 |
||
34 |
def threadpool_debug(app): |
|
35 |
"""Wrap `app` to provide debugging information about the threadpool state.
|
|
36 |
||
37 |
The returned application will serve debugging information about the state
|
|
38 |
of the threadpool at '/thread-debug' -- but only when accessed directly,
|
|
39 |
not when accessed through Apache.
|
|
40 |
"""
|
|
41 |
def wrapped(environ, start_response): |
|
42 |
if ('HTTP_X_FORWARDED_SERVER' in environ |
|
43 |
or environ['PATH_INFO'] != '/thread-debug'): |
|
44 |
environ['lp.timestarted'] = time.time() |
|
45 |
return app(environ, start_response) |
|
46 |
threadpool = environ['paste.httpserver.thread_pool'] |
|
47 |
start_response("200 Ok", []) |
|
48 |
output = [("url", "time running", "time since last activity")] |
|
49 |
now = time.time() |
|
50 |
# Because we're accessing mutable structures without locks here,
|
|
51 |
# we're a bit cautious about things looking like we expect -- if a
|
|
52 |
# worker doesn't seem fully set up, we just ignore it.
|
|
53 |
for worker in threadpool.workers: |
|
54 |
if not hasattr(worker, 'thread_id'): |
|
55 |
continue
|
|
56 |
time_started, info = threadpool.worker_tracker.get( |
|
57 |
worker.thread_id, (None, None)) |
|
58 |
if time_started is not None and info is not None: |
|
59 |
real_time_started = info.get( |
|
60 |
'lp.timestarted', time_started) |
|
61 |
output.append( |
|
62 |
map(str, |
|
63 |
(construct_url(info), |
|
64 |
now - real_time_started, |
|
65 |
now - time_started,))) |
|
66 |
return tabulate(output) |
|
67 |
return wrapped |
|
68 |
||
69 |
||
70 |
def change_kill_thread_criteria(application): |
|
71 |
"""Interfere with threadpool so that threads are killed for inactivity.
|
|
72 |
||
73 |
The usual rules with paste's threadpool is that a thread that takes longer
|
|
74 |
than 'hung_thread_limit' seconds to process a request is considered hung
|
|
75 |
and more than 'kill_thread_limit' seconds is killed.
|
|
76 |
||
77 |
Because loggerhead streams its output, how long the entire request takes
|
|
78 |
to process depends on things like how fast the users internet connection
|
|
79 |
is. What we'd like to do is kill threads that don't _start_ to produce
|
|
80 |
output for 'kill_thread_limit' seconds.
|
|
81 |
||
82 |
What this class actually does is arrange things so that threads that
|
|
83 |
produce no output for 'kill_thread_limit' are killed, because that's the
|
|
84 |
rule Apache uses when interpreting ProxyTimeout.
|
|
85 |
"""
|
|
86 |
def wrapped_application(environ, start_response): |
|
87 |
threadpool = environ['paste.httpserver.thread_pool'] |
|
88 |
def reset_timer(): |
|
89 |
"""Make this thread safe for another 'kill_thread_limit' seconds.
|
|
90 |
||
91 |
We do this by hacking the threadpool's record of when this thread
|
|
92 |
started to pretend that it started right now. Hacky, but it's
|
|
93 |
enough to fool paste.httpserver.ThreadPool.kill_hung_threads and
|
|
94 |
that's what matters.
|
|
95 |
"""
|
|
96 |
threadpool.worker_tracker[thread.get_ident()][0] = time.time() |
|
97 |
def response_hook(status, response_headers, exc_info=None): |
|
98 |
# We reset the timer when the HTTP headers are sent...
|
|
99 |
reset_timer() |
|
100 |
writer = start_response(status, response_headers, exc_info) |
|
101 |
def wrapped_writer(arg): |
|
102 |
# ... and whenever more output has been generated.
|
|
103 |
reset_timer() |
|
104 |
return writer(arg) |
|
105 |
return wrapped_writer |
|
106 |
result = application(environ, response_hook) |
|
107 |
# WSGI allows the application to return an iterable, which could be a
|
|
108 |
# generator that does significant processing between successive items,
|
|
109 |
# so we should reset the timer between each item.
|
|
110 |
#
|
|
111 |
# This isn't really necessary as loggerhead doesn't return any
|
|
112 |
# non-trivial iterables to the WSGI server. But it's probably better
|
|
113 |
# to cope with this case to avoid nasty suprises if loggerhead
|
|
114 |
# changes.
|
|
115 |
def reset_timer_between_items(iterable): |
|
116 |
for item in iterable: |
|
117 |
reset_timer() |
|
118 |
yield item |
|
119 |
return reset_timer_between_items(result) |
|
120 |
return wrapped_application |