~launchpad-pqm/launchpad/devel

9590.1.135 by Michael Hudson
add files from launchpad-loggerhead tree to launchpad tree
1
# Copyright 2009 Canonical Ltd.  This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
3
4
import thread
5
import time
6
7
from paste.request import construct_url
8
9
10
def tabulate(cells):
11
    """Format a list of lists of strings in a table.
12
13
    The 'cells' are centered.
14
15
    >>> print ''.join(tabulate(
16
    ...     [['title 1', 'title 2'],
17
    ...      ['short', 'rather longer']]))
18
     title 1     title 2
19
      short   rather longer
20
    """
21
    widths = {}
22
    for row in cells:
23
        for col_index, cell in enumerate(row):
24
            widths[col_index] = max(len(cell), widths.get(col_index, 0))
25
    result = []
26
    for row in cells:
27
        result_row = ''
28
        for col_index, cell in enumerate(row):
29
            result_row += cell.center(widths[col_index] + 2)
30
        result.append(result_row.rstrip() + '\n')
31
    return result
32
33
34
def threadpool_debug(app):
35
    """Wrap `app` to provide debugging information about the threadpool state.
36
37
    The returned application will serve debugging information about the state
38
    of the threadpool at '/thread-debug' -- but only when accessed directly,
39
    not when accessed through Apache.
40
    """
41
    def wrapped(environ, start_response):
42
        if ('HTTP_X_FORWARDED_SERVER' in environ
43
            or environ['PATH_INFO'] != '/thread-debug'):
44
            environ['lp.timestarted'] = time.time()
45
            return app(environ, start_response)
46
        threadpool = environ['paste.httpserver.thread_pool']
47
        start_response("200 Ok", [])
48
        output = [("url", "time running", "time since last activity")]
49
        now = time.time()
50
        # Because we're accessing mutable structures without locks here,
51
        # we're a bit cautious about things looking like we expect -- if a
52
        # worker doesn't seem fully set up, we just ignore it.
53
        for worker in threadpool.workers:
54
            if not hasattr(worker, 'thread_id'):
55
                continue
56
            time_started, info = threadpool.worker_tracker.get(
57
                worker.thread_id, (None, None))
58
            if time_started is not None and info is not None:
59
                real_time_started = info.get(
60
                    'lp.timestarted', time_started)
61
                output.append(
62
                    map(str,
63
                        (construct_url(info),
64
                         now - real_time_started,
65
                         now - time_started,)))
66
        return tabulate(output)
67
    return wrapped
68
69
70
def change_kill_thread_criteria(application):
71
    """Interfere with threadpool so that threads are killed for inactivity.
72
73
    The usual rules with paste's threadpool is that a thread that takes longer
74
    than 'hung_thread_limit' seconds to process a request is considered hung
75
    and more than 'kill_thread_limit' seconds is killed.
76
77
    Because loggerhead streams its output, how long the entire request takes
78
    to process depends on things like how fast the users internet connection
79
    is.  What we'd like to do is kill threads that don't _start_ to produce
80
    output for 'kill_thread_limit' seconds.
81
82
    What this class actually does is arrange things so that threads that
83
    produce no output for 'kill_thread_limit' are killed, because that's the
84
    rule Apache uses when interpreting ProxyTimeout.
85
    """
86
    def wrapped_application(environ, start_response):
87
        threadpool = environ['paste.httpserver.thread_pool']
88
        def reset_timer():
89
            """Make this thread safe for another 'kill_thread_limit' seconds.
90
91
            We do this by hacking the threadpool's record of when this thread
92
            started to pretend that it started right now.  Hacky, but it's
93
            enough to fool paste.httpserver.ThreadPool.kill_hung_threads and
94
            that's what matters.
95
            """
96
            threadpool.worker_tracker[thread.get_ident()][0] = time.time()
97
        def response_hook(status, response_headers, exc_info=None):
98
            # We reset the timer when the HTTP headers are sent...
99
            reset_timer()
100
            writer = start_response(status, response_headers, exc_info)
101
            def wrapped_writer(arg):
102
                # ... and whenever more output has been generated.
103
                reset_timer()
104
                return writer(arg)
105
            return wrapped_writer
106
        result = application(environ, response_hook)
107
        # WSGI allows the application to return an iterable, which could be a
108
        # generator that does significant processing between successive items,
109
        # so we should reset the timer between each item.
110
        #
111
        # This isn't really necessary as loggerhead doesn't return any
112
        # non-trivial iterables to the WSGI server.  But it's probably better
113
        # to cope with this case to avoid nasty suprises if loggerhead
114
        # changes.
115
        def reset_timer_between_items(iterable):
116
            for item in iterable:
117
                reset_timer()
118
                yield item
119
        return reset_timer_between_items(result)
120
    return wrapped_application