~launchpad-pqm/launchpad/devel

1102 by Canonical.com Patch Queue Manager
Lucille had some XXXs which should have been NOTEs
1
#!/usr/bin/env python
2
3
#############################################################################
4
# 
5
# Zope Public License (ZPL) Version 2.0
6
# -----------------------------------------------
7
# 
8
# This software is Copyright (c) Zope Corporation (tm) and
9
# Contributors. All rights reserved.
10
# 
11
# This license has been certified as open source. It has also
12
# been designated as GPL compatible by the Free Software
13
# Foundation (FSF).
14
# 
15
# Redistribution and use in source and binary forms, with or
16
# without modification, are permitted provided that the
17
# following conditions are met:
18
# 
19
# 1. Redistributions in source code must retain the above
20
#    copyright notice, this list of conditions, and the following
21
#    disclaimer.
22
# 
23
# 2. Redistributions in binary form must reproduce the above
24
#    copyright notice, this list of conditions, and the following
25
#    disclaimer in the documentation and/or other materials
26
#    provided with the distribution.
27
# 
28
# 3. The name Zope Corporation (tm) must not be used to
29
#    endorse or promote products derived from this software
30
#    without prior written permission from Zope Corporation.
31
# 
32
# 4. The right to distribute this software or to use it for
33
#    any purpose does not give you the right to use Servicemarks
34
#    (sm) or Trademarks (tm) of Zope Corporation. Use of them is
35
#    covered in a separate agreement (see
36
#    http://www.zope.com/Marks).
37
# 
38
# 5. If any files are modified, you must cause the modified
39
#    files to carry prominent notices stating that you changed
40
#    the files and the date of any change.
41
# 
42
# Disclaimer
43
# 
44
#   THIS SOFTWARE IS PROVIDED BY ZOPE CORPORATION ``AS IS''
45
#   AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
46
#   NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
47
#   AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN
48
#   NO EVENT SHALL ZOPE CORPORATION OR ITS CONTRIBUTORS BE
49
#   LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
50
#   EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
51
#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
52
#   LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
53
#   HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
54
#   CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
55
#   OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
56
#   SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
57
#   DAMAGE.
58
# 
59
# 
60
# This software consists of contributions made by Zope
61
# Corporation and many individuals on behalf of Zope
62
# Corporation.  Specific attributions are listed in the
63
# accompanying credits file.
64
# 
65
#############################################################################
66
"""TCPWatch, a connection forwarder and HTTP proxy for monitoring connections.
67
68
Requires Python 2.1 or above.
69
70
Revision information:
71
$Id: tcpwatch.py,v 1.11 2004/10/04 14:30:25 jim Exp $
72
"""
73
74
from __future__ import nested_scopes
75
76
VERSION = '1.3+'
77
COPYRIGHT = (
78
    'TCPWatch %s Copyright 2001 Shane Hathaway, Zope Corporation'
79
    % VERSION)
80
81
import sys
82
import os
83
import socket
84
import asyncore
85
import getopt
86
from time import time, localtime
87
88
89
RECV_BUFFER_SIZE = 8192
90
show_cr = 0
91
92
93
#############################################################################
94
#
95
# Connection forwarder
96
#
97
#############################################################################
98
99
100
class ForwardingEndpoint (asyncore.dispatcher):
101
    """A socket wrapper that accepts and generates stream messages.
102
    """
103
    _dests = ()
104
105
    def __init__(self, conn=None):
106
        self._outbuf = []
107
        asyncore.dispatcher.__init__(self, conn)
108
109
    def set_dests(self, dests):
110
        """Sets the destination streams.
111
        """
112
        self._dests = dests
113
114
    def write(self, data):
115
        if data:
116
            self._outbuf.append(data)
117
            self.handle_write()
118
119
    def readable(self):
120
        return 1
121
122
    def writable(self):
123
        return not self.connected or len(self._outbuf) > 0
124
125
    def handle_connect(self):
126
        for d in self._dests:
127
            d.write('')  # A blank string means the socket just connected.
128
129
    def received(self, data):
130
        if data:
131
            for d in self._dests:
132
                d.write(data)
133
134
    def handle_read(self):
135
        data = self.recv(RECV_BUFFER_SIZE)
136
        self.received(data)
137
138
    def handle_write(self):
139
        if not self.connected:
140
            # Wait for a connection.
141
            return
142
        buf = self._outbuf
143
        while buf:
144
            data = buf.pop(0)
145
            if data:
146
                sent = self.send(data)
147
                if sent < len(data):
148
                    buf.insert(0, data[sent:])
149
                    break
150
151
    def handle_close (self):
152
        dests = self._dests
153
        self._dests = ()
154
        for d in dests:
155
            d.close()
156
        self.close()
157
158
    def handle_error(self):
159
        t, v = sys.exc_info()[:2]
160
        for d in self._dests:
161
            if hasattr(d, 'error'):
162
                d.error(t, v)
163
        self.handle_close()
164
165
166
167
class EndpointObserver:
168
    """Sends stream events to a ConnectionObserver.
169
170
    Streams don't distinguish sources, while ConnectionObservers do.
171
    This adapter adds source information to stream events.
172
    """
173
174
    def __init__(self, obs, from_client):
175
        self.obs = obs
176
        self.from_client = from_client
177
178
    def write(self, data):
179
        if data:
180
            self.obs.received(data, self.from_client)
181
        else:
182
            self.obs.connected(self.from_client)
183
184
    def close(self):
185
        self.obs.closed(self.from_client)
186
187
    def error(self, t, v):
188
        self.obs.error(self.from_client, t, v)
189
190
191
192
class ForwardedConnectionInfo:
193
    transaction = 1
194
195
    def __init__(self, connection_number, client_addr, server_addr=None):
196
        self.opened = time()
197
        self.connection_number = connection_number
198
        self.client_addr = client_addr
199
        self.server_addr = server_addr
200
201
    def dup(self):
202
        return ForwardedConnectionInfo(self.connection_number,
203
                                       self.client_addr,
204
                                       self.server_addr)
205
206
207
208
class ForwardingService (asyncore.dispatcher):
209
210
    _counter = 0
211
212
    def __init__(self, listen_host, listen_port, dest_host, dest_port,
213
                 observer_factory=None):
214
        self._obs_factory = observer_factory
215
        self._dest = (dest_host, dest_port)
216
        asyncore.dispatcher.__init__(self)
217
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
218
        self.set_reuse_addr()
219
        self.bind((listen_host, listen_port))
220
        self.listen(5)
221
222
    def handle_accept(self):
223
        info = self.accept()
224
        if info:
225
            # Got a connection.
226
            conn, addr = info
227
            conn.setblocking(0)
228
229
            ep1 = ForwardingEndpoint()  # connects client to self
230
            ep2 = ForwardingEndpoint()  # connects self to server
231
232
            counter = self._counter + 1
233
            self._counter = counter
234
            factory = self._obs_factory
235
            if factory is not None:
236
                fci = ForwardedConnectionInfo(counter, addr, self._dest)
237
                obs = factory(fci)
238
                dests1 = (ep2, EndpointObserver(obs, 1))
239
                dests2 = (ep1, EndpointObserver(obs, 0))
240
            else:
241
                dests1 = (ep2,)
242
                dests2 = (ep1,)
243
244
            ep1.set_dests(dests1)
245
            ep2.set_dests(dests2)
246
247
            # Now everything is hooked up.  Let data pass.
248
            ep2.create_socket(socket.AF_INET, socket.SOCK_STREAM)
249
            ep1.set_socket(conn)
250
            ep1.connected = 1  # We already know the client connected.
251
            ep2.connect(self._dest)
252
253
    def handle_error(self):
254
        # Don't stop the server.
255
        import traceback
256
        traceback.print_exc()
257
258
259
260
class IConnectionObserver:
261
262
    def connected(from_client):
263
        """Called when the client or the server connects.
264
        """
265
266
    def received(data, from_client):
267
        """Called when the client or the server sends data.
268
        """
269
270
    def closed(from_client):
271
        """Called when the client or the server closes the channel.
272
        """
273
274
    def error(from_client, type, value):
275
        """Called when an error occurs in the client or the server channel.
276
        """
277
278
279
#############################################################################
280
#
281
# Basic abstract connection observer and stdout observer
282
#
283
#############################################################################
284
285
286
def escape(s):
287
    # Encode a string with backslashes.  For example, a string
288
    # containing characters 0 and 1 will be rendered as \x00\x01.
4664.1.1 by Curtis Hovey
Normalized comments for bug 3732.
289
    # XXX daniels 2004-12-14:
290
    # This implementation might be a brittle trick. :-(
1102 by Canonical.com Patch Queue Manager
Lucille had some XXXs which should have been NOTEs
291
    return repr('"\'' + str(s))[4:-1]
292
293
294
class BasicObserver:
295
296
    continuing_line = -1  # Tracks when a line isn't finished.
297
    arrows = ('<==', '==>')
298
299
    def __init__(self):
300
        self._start = time()
301
302
    def _output_message(self, m, from_client):
303
        if self.continuing_line >= 0:
304
            self.write('\n')
305
            self.continuing_line = -1
306
        if from_client:
307
            who = 'client'
308
        else:
309
            who = 'server'
310
311
        t = time() - self._start
312
        min, sec = divmod(t, 60)
313
        self.write('[%02d:%06.3f - %s %s]\n' % (min, sec, who, m))
314
        self.flush()
315
316
    def connection_from(self, fci):
317
        if fci.server_addr is not None:
318
            self._output_message(
319
                '%s:%s forwarded to %s:%s' %
320
                (tuple(fci.client_addr) + tuple(fci.server_addr)), 1)
321
        else:
322
            self._output_message(
323
                'connection from %s:%s' %
324
                (tuple(fci.client_addr)), 1)
325
326
        if fci.transaction > 1:
327
            self._output_message(
328
                ('HTTP transaction #%d' % fci.transaction), 1)
329
330
    def connected(self, from_client):
331
        self._output_message('connected', from_client)
332
333
    def received(self, data, from_client):
334
        arrow = self.arrows[from_client]
335
        cl = self.continuing_line
336
        if cl >= 0:
337
            if cl != from_client:
338
                # Switching directions.
339
                self.write('\n%s' % arrow)
340
        else:
341
            self.write(arrow)
342
343
        if data.endswith('\n'):
344
            data = data[:-1]
345
            newline = 1
346
        else:
347
            newline = 0
348
349
        if not show_cr:
350
            data = data.replace('\r', '')
351
        lines = data.split('\n')
352
        lines = map(escape, lines)
353
        s = ('\n%s' % arrow).join(lines)
354
        self.write(s)
355
356
        if newline:
357
            self.write('\n')
358
            self.continuing_line = -1
359
        else:
360
            self.continuing_line = from_client
361
        self.flush()
362
363
    def closed(self, from_client):
364
        self._output_message('closed', from_client)
365
366
    def error(self, from_client, type, value):
367
        self._output_message(
368
            'connection error %s: %s' % (type, value), from_client)
369
    
370
    def write(self, s):
371
        raise NotImplementedError
372
373
    def flush(self):
374
        raise NotImplementedError
375
            
376
377
class StdoutObserver (BasicObserver):
378
379
    # __implements__ = IConnectionObserver
380
381
    def __init__(self, fci):
382
        BasicObserver.__init__(self)
383
        self.connection_from(fci)
384
385
    def write(self, s):
386
        sys.stdout.write(s)
387
388
    def flush(self):
389
        sys.stdout.flush()
390
391
392
# 'log_number' is a log file counter used for naming log files.
393
log_number = 0
394
395
def nextLogNumber():
396
    global log_number
397
    log_number = log_number + 1
398
    return log_number    
399
400
401
class RecordingObserver (BasicObserver):
402
    """Log request to a file.
403
404
    o Filenames mangle connection and transaction numbers from the
405
      ForwardedConnectionInfo passed as 'fci'.
406
407
    o Decorates an underlying observer, created via the passed 'sub_factory'.
408
409
    o Files are created in the supplied 'record_directory'.
410
411
    o Unless suppressed, log response and error to corresponding files.
412
    """
413
    _ERROR_SOURCES = ('Server', 'Client')
414
415
    # __implements__ = IConnectionObserver
416
417
    def __init__(self, fci, sub_factory, record_directory,
418
                 record_prefix='watch', record_responses=1, record_errors=1):
419
        self._log_number = nextLogNumber()
420
        self._decorated = sub_factory(fci)
421
        self._directory = record_directory
422
        self._prefix = record_prefix
423
        self._response = record_responses
424
        self._errors = record_errors
425
426
    def connected(self, from_client):
427
        """See IConnectionObserver.
428
        """
429
        self._decorated.connected(from_client)
430
431
    def received(self, data, from_client):
432
        """See IConnectionObserver.
433
        """
434
        if from_client or self._response:
435
            extension = from_client and 'request' or 'response'
436
            file = self._openForAppend(extension=extension)
437
            file.write(data)
438
            file.close()
439
        self._decorated.received(data, from_client)
440
441
    def closed(self, from_client):
442
        """See IConnectionObserver.
443
        """
444
        self._decorated.closed(from_client)
445
446
    def error(self, from_client, type, value):
447
        """See IConnectionObserver.
448
        """
449
        if self._errors:
450
            file = self._openForAppend(extension='errors')
451
            file.write('(%s) %s: %s\n' % (self._ERROR_SOURCES[from_client],
452
                                          type, value))
453
        self._decorated.error(from_client, type, value)
454
455
    def _openForAppend(self, extension):
456
        """Open a file with the given extension for appending.
457
458
        o File should be in the directory indicated by self._directory.
459
460
        o File should have a filename '<prefix>_<conn #>.<extension>'.
461
        """
462
        filename = '%s%04d.%s' % (self._prefix, self._log_number, extension)
463
        fqpath = os.path.join(self._directory, filename)
464
        return open(fqpath, 'ab')
465
466
467
#############################################################################
468
#
469
# Tkinter GUI
470
#
471
#############################################################################
472
473
474
def setupTk(titlepart, config_info, colorized=1):
475
    """Starts the Tk application and returns an observer factory.
476
    """
477
478
    import Tkinter
479
    from ScrolledText import ScrolledText
480
    from Queue import Queue, Empty
481
    try:
482
        from cStringIO import StringIO
483
    except ImportError:
484
        from StringIO import StringIO
485
486
    startup_text = COPYRIGHT + ("""
487
488
Use your client to connect to the proxied port(s) then click
489
the list on the left to see the data transferred.
490
491
%s
492
""" % config_info)
493
494
495
    class TkTCPWatch (Tkinter.Frame):
496
        '''The tcpwatch top-level window.
497
        '''
498
        def __init__(self, master):
499
            Tkinter.Frame.__init__(self, master)
500
            self.createWidgets()
501
            # connections maps ids to TkConnectionObservers.
502
            self.connections = {}
503
            self.showingid = ''
504
            self.queue = Queue()
505
            self.processQueue()
506
507
        def createWidgets(self):
508
            listframe = Tkinter.Frame(self)
509
            listframe.pack(side=Tkinter.LEFT, fill=Tkinter.BOTH, expand=1)
510
            scrollbar = Tkinter.Scrollbar(listframe, orient=Tkinter.VERTICAL)
511
            self.connectlist = Tkinter.Listbox(
512
                listframe, yscrollcommand=scrollbar.set, exportselection=0)
513
            scrollbar.config(command=self.connectlist.yview)
514
            scrollbar.pack(side=Tkinter.RIGHT, fill=Tkinter.Y)
515
            self.connectlist.pack(
516
                side=Tkinter.LEFT, fill=Tkinter.BOTH, expand=1)
517
            self.connectlist.bind('<Button-1>', self.mouseListSelect)
518
            self.textbox = ScrolledText(self, background="#ffffff")
519
            self.textbox.tag_config("message", foreground="#000000")
520
            self.textbox.tag_config("client", foreground="#007700")
521
            self.textbox.tag_config(
522
                "clientesc", foreground="#007700", background="#dddddd")
523
            self.textbox.tag_config("server", foreground="#770000")
524
            self.textbox.tag_config(
525
                "serveresc", foreground="#770000", background="#dddddd")
526
            self.textbox.insert(Tkinter.END, startup_text, "message")
527
            self.textbox.pack(side='right', fill=Tkinter.BOTH, expand=1)
528
            self.pack(fill=Tkinter.BOTH, expand=1)
529
530
        def addConnection(self, id, conn):
531
            self.connections[id] = conn
532
            connectlist = self.connectlist
533
            connectlist.insert(Tkinter.END, id)
534
535
        def updateConnection(self, id, output):
536
            if id == self.showingid:
537
                textbox = self.textbox
538
                for data, style in output:
539
                    textbox.insert(Tkinter.END, data, style)
540
541
        def mouseListSelect(self, event=None):
542
            connectlist = self.connectlist
543
            idx = connectlist.nearest(event.y)
544
            sel = connectlist.get(idx)
545
            connections = self.connections
546
            if connections.has_key(sel):
547
                self.showingid = ''
548
                output = connections[sel].getOutput()
549
                self.textbox.delete(1.0, Tkinter.END)
550
                for data, style in output:
551
                    self.textbox.insert(Tkinter.END, data, style)
552
                self.showingid = sel
553
554
        def processQueue(self):
555
            try:
556
                if not self.queue.empty():
557
                    # Process messages for up to 1/4 second
558
                    from time import time
559
                    limit = time() + 0.25
560
                    while time() < limit:
561
                        try:
562
                            f, args = self.queue.get_nowait()
563
                        except Empty:
564
                            break
565
                        f(*args)
566
            finally:
567
                self.master.after(50, self.processQueue)
568
569
570
    class TkConnectionObserver (BasicObserver):
571
        '''A connection observer which shows captured data in a TCPWatch
572
        frame.  The data is mangled for presentation.
573
        '''
574
        # __implements__ = IConnectionObserver
575
576
        def __init__(self, frame, fci, colorized=1):
577
            BasicObserver.__init__(self)
578
            self._output = []  # list of tuples containing (text, style)
579
            self._frame = frame
580
            self._colorized = colorized
581
            t = localtime(fci.opened)
582
            if fci.transaction > 1:
583
                base_id = '%03d-%02d' % (
584
                    fci.connection_number, fci.transaction)
585
            else:
586
                base_id = '%03d' % fci.connection_number
587
            id = '%s (%02d:%02d:%02d)' % (base_id, t[3], t[4], t[5])
588
            self._id = id
589
            frame.queue.put((frame.addConnection, (id, self)))
590
            self.connection_from(fci)
591
592
        def write(self, s):
593
            output = [(s, "message")]
594
            self._output.extend(output)
595
            self._frame.queue.put(
596
                (self._frame.updateConnection, (self._id, output)))
597
598
        def flush(self):
599
            pass
600
601
        def received(self, data, from_client):
602
            if not self._colorized:
603
                BasicObserver.received(self, data, from_client)
604
                return
605
606
            if not show_cr:
607
                data = data.replace('\r', '')
608
609
            output = []
610
611
            extra_color = (self._colorized == 2)
612
613
            if extra_color:
614
                # 4 colors: Change the color client/server and escaped chars
615
                def append(ss, escaped, output=output,
616
                           from_client=from_client, escape=escape):
617
                    if escaped:
618
                        output.append((escape(ss), from_client
619
                                       and 'clientesc' or 'serveresc'))
620
                    else:
621
                        output.append((ss, from_client
622
                                       and 'client' or 'server'))
623
            else:
624
                # 2 colors: Only change color for client/server
625
                segments = []
626
                def append(ss, escaped, segments=segments,
627
                           escape=escape):
628
                    if escaped:
629
                        segments.append(escape(ss))
630
                    else:
631
                        segments.append(ss)
632
633
            # Escape the input data.
634
            was_escaped = 0
635
            start_idx = 0
636
            for idx in xrange(len(data)):
637
                c = data[idx]
638
                escaped = (c < ' ' and c != '\n') or c >= '\x80'
639
                if was_escaped != escaped:
640
                    ss = data[start_idx:idx]
641
                    if ss:
642
                        append(ss, was_escaped)
643
                    was_escaped = escaped
644
                    start_idx = idx
645
            ss = data[start_idx:]
646
            if ss:
647
                append(ss, was_escaped)
648
649
            if not extra_color:
650
                output.append((''.join(segments),
651
                               from_client and 'client' or 'server'))
652
653
            # Send output to the frame.
654
            self._output.extend(output)
655
            self._frame.queue.put(
656
                (self._frame.updateConnection, (self._id, output)))
657
            if data.endswith('\n'):
658
                self.continuing_line = -1
659
            else:
660
                self.continuing_line = from_client
661
662
        def getOutput(self):
663
            return self._output
664
665
666
    def createApp(titlepart):
667
        master = Tkinter.Tk()
668
        app = TkTCPWatch(master)
669
        try:
670
            wm_title = app.master.wm_title
671
        except AttributeError:
672
            pass  # No wm_title method available.
673
        else:
674
            wm_title('TCPWatch [%s]' % titlepart)
675
        return app
676
677
    app = createApp(titlepart)
678
679
    def tkObserverFactory(fci, app=app, colorized=colorized):
680
        return TkConnectionObserver(app, fci, colorized)
681
682
    return tkObserverFactory, app.mainloop
683
684
685
686
#############################################################################
687
#
688
# The HTTP splitter
689
#
690
# Derived from Zope.Server.HTTPServer.
691
#
692
#############################################################################
693
694
695
def find_double_newline(s):
696
    """Returns the position just after the double newline."""
697
    pos1 = s.find('\n\r\n')  # One kind of double newline
698
    if pos1 >= 0:
699
        pos1 += 3
700
    pos2 = s.find('\n\n')    # Another kind of double newline
701
    if pos2 >= 0:
702
        pos2 += 2
703
704
    if pos1 >= 0:
705
        if pos2 >= 0:
706
            return min(pos1, pos2)
707
        else:
708
            return pos1
709
    else:
710
        return pos2
711
712
713
714
class StreamedReceiver:
715
    """Accepts data up to a specific limit."""
716
717
    completed = 0
718
719
    def __init__(self, cl, buf=None):
720
        self.remain = cl
721
        self.buf = buf
722
        if cl < 1:
723
            self.completed = 1
724
725
    def received(self, data):
726
        rm = self.remain
727
        if rm < 1:
728
            self.completed = 1  # Avoid any chance of spinning
729
            return 0
730
        buf = self.buf
731
        datalen = len(data)
732
        if rm <= datalen:
733
            if buf is not None:
734
                buf.append(data[:rm])
735
            self.remain = 0
736
            self.completed = 1
737
            return rm
738
        else:
739
            if buf is not None:
740
                buf.append(data)
741
            self.remain -= datalen
742
            return datalen
743
744
745
746
class UnlimitedReceiver:
747
    """Accepts data without limits."""
748
749
    completed = 0
750
751
    def received(self, data):
752
        # always consume everything
753
        return len(data)
754
755
756
757
class ChunkedReceiver:
758
    """Accepts all chunks."""
759
760
    chunk_remainder = 0
761
    control_line = ''
762
    all_chunks_received = 0
763
    trailer = ''
764
    completed = 0
765
766
767
    def __init__(self, buf=None):
768
        self.buf = buf
769
770
    def received(self, s):
771
        # Returns the number of bytes consumed.
772
        if self.completed:
773
            return 0
774
        orig_size = len(s)
775
        while s:
776
            rm = self.chunk_remainder
777
            if rm > 0:
778
                # Receive the remainder of a chunk.
779
                to_write = s[:rm]
780
                if self.buf is not None:
781
                    self.buf.append(to_write)
782
                written = len(to_write)
783
                s = s[written:]
784
                self.chunk_remainder -= written
785
            elif not self.all_chunks_received:
786
                # Receive a control line.
787
                s = self.control_line + s
788
                pos = s.find('\n')
789
                if pos < 0:
790
                    # Control line not finished.
791
                    self.control_line = s
792
                    s = ''
793
                else:
794
                    # Control line finished.
795
                    line = s[:pos]
796
                    s = s[pos + 1:]
797
                    self.control_line = ''
798
                    line = line.strip()
799
                    if line:
800
                        # Begin a new chunk.
801
                        semi = line.find(';')
802
                        if semi >= 0:
803
                            # discard extension info.
804
                            line = line[:semi]
805
                        sz = int(line.strip(), 16)  # hexadecimal
806
                        if sz > 0:
807
                            # Start a new chunk.
808
                            self.chunk_remainder = sz
809
                        else:
810
                            # Finished chunks.
811
                            self.all_chunks_received = 1
812
                    # else expect a control line.
813
            else:
814
                # Receive the trailer.
815
                trailer = self.trailer + s
816
                if trailer[:2] == '\r\n':
817
                    # No trailer.
818
                    self.completed = 1
819
                    return orig_size - (len(trailer) - 2)
820
                elif trailer[:1] == '\n':
821
                    # No trailer.
822
                    self.completed = 1
823
                    return orig_size - (len(trailer) - 1)
824
                pos = find_double_newline(trailer)
825
                if pos < 0:
826
                    # Trailer not finished.
827
                    self.trailer = trailer
828
                    s = ''
829
                else:
830
                    # Finished the trailer.
831
                    self.completed = 1
832
                    self.trailer = trailer[:pos]
833
                    return orig_size - (len(trailer) - pos)
834
        return orig_size
835
836
837
838
class HTTPStreamParser:
839
    """A structure that parses the HTTP stream.
840
    """
841
842
    completed = 0    # Set once request is completed.
843
    empty = 0        # Set if no request was made.
844
    header_plus = ''
845
    chunked = 0
846
    content_length = 0
847
    body_rcv = None
848
849
    # headers is a mapping containing keys translated to uppercase
850
    # with dashes turned into underscores.
851
852
    def __init__(self, is_a_request):
853
        self.headers = {}
854
        self.is_a_request = is_a_request
855
        self.body_data = []
856
857
    def received(self, data):
858
        """Receives the HTTP stream for one request.
859
860
        Returns the number of bytes consumed.
861
        Sets the completed flag once both the header and the
862
        body have been received.
863
        """
864
        if self.completed:
865
            return 0  # Can't consume any more.
866
        datalen = len(data)
867
        br = self.body_rcv
868
        if br is None:
869
            # In header.
870
            s = self.header_plus + data
871
            index = find_double_newline(s)
872
            if index >= 0:
873
                # Header finished.
874
                header_plus = s[:index]
875
                consumed = len(data) - (len(s) - index)
876
                self.in_header = 0
877
                # Remove preceeding blank lines.
878
                header_plus = header_plus.lstrip()
879
                if not header_plus:
880
                    self.empty = 1
881
                    self.completed = 1
882
                else:
883
                    self.parse_header(header_plus)
884
                    if self.body_rcv is None or self.body_rcv.completed:
885
                        self.completed = 1
886
                return consumed
887
            else:
888
                # Header not finished yet.
889
                self.header_plus = s
890
                return datalen
891
        else:
892
            # In body.
893
            consumed = br.received(data)
894
            self.body_data.append(data[:consumed])
895
            if br.completed:
896
                self.completed = 1
897
            return consumed
898
899
900
    def parse_header(self, header_plus):
901
        """Parses the header_plus block of text.
902
903
        (header_plus is the headers plus the first line of the request).
904
        """
905
        index = header_plus.find('\n')
906
        if index >= 0:
907
            first_line = header_plus[:index]
908
            header = header_plus[index + 1:]
909
        else:
910
            first_line = header_plus
911
            header = ''
912
        self.first_line = first_line
913
        self.header = header
914
915
        lines = self.get_header_lines()
916
        headers = self.headers
917
        for line in lines:
918
            index = line.find(':')
919
            if index > 0:
920
                key = line[:index]
921
                value = line[index + 1:].strip()
922
                key1 = key.upper().replace('-', '_')
923
                headers[key1] = value
924
            # else there's garbage in the headers?
925
926
        if not self.is_a_request:
927
            # Check for a 304 response.
928
            parts = first_line.split()
929
            if len(parts) >= 2 and parts[1] == '304':
930
                # Expect no body.
931
                self.body_rcv = StreamedReceiver(0)
932
933
        if self.body_rcv is None:
934
            # Ignore the HTTP version and just assume
935
            # that the Transfer-Encoding header, when supplied, is valid.
936
            te = headers.get('TRANSFER_ENCODING', '')
937
            if te == 'chunked':
938
                self.chunked = 1
939
                self.body_rcv = ChunkedReceiver()
940
            if not self.chunked:
941
                cl = int(headers.get('CONTENT_LENGTH', -1))
942
                self.content_length = cl
943
                if cl >= 0 or self.is_a_request:
944
                    self.body_rcv = StreamedReceiver(cl)
945
                else:
946
                    # No content length and this is a response.
947
                    # We have to assume unlimited content length.
948
                    self.body_rcv = UnlimitedReceiver()
949
950
951
    def get_header_lines(self):
952
        """Splits the header into lines, putting multi-line headers together.
953
        """
954
        r = []
955
        lines = self.header.split('\n')
956
        for line in lines:
957
            if line.endswith('\r'):
958
                line = line[:-1]
959
            if line and line[0] in ' \t':
960
                r[-1] = r[-1] + line[1:]
961
            else:
962
                r.append(line)
963
        return r
964
965
966
967
class HTTPConnectionSplitter:
968
    """Makes a new observer for each HTTP subconnection and forwards events.
969
    """
970
971
    # __implements__ = IConnectionObserver
972
    req_index = 0
973
    resp_index = 0
974
975
    def __init__(self, sub_factory, fci):
976
        self.sub_factory = sub_factory
977
        self.transactions = []  # (observer, request_data, response_data)
978
        self.fci = fci
979
        self._newTransaction()
980
981
    def _newTransaction(self):
982
        fci = self.fci.dup()
983
        fci.transaction = len(self.transactions) + 1
984
        obs = self.sub_factory(fci)
985
        req = HTTPStreamParser(1)
986
        resp = HTTPStreamParser(0)
987
        self.transactions.append((obs, req, resp))
988
989
    def _mostRecentObs(self):
990
        return self.transactions[-1][0]
991
992
    def connected(self, from_client):
993
        self._mostRecentObs().connected(from_client)
994
995
    def closed(self, from_client):
996
        self._mostRecentObs().closed(from_client)
997
998
    def error(self, from_client, type, value):
999
        self._mostRecentObs().error(from_client, type, value)
1000
1001
    def received(self, data, from_client):
1002
        transactions = self.transactions
1003
        while data:
1004
            if from_client:
1005
                index = self.req_index
1006
            else:
1007
                index = self.resp_index
1008
            if index >= len(transactions):
1009
                self._newTransaction()
1010
1011
            obs, req, resp = transactions[index]
1012
            if from_client:
1013
                parser = req
1014
            else:
1015
                parser = resp
1016
1017
            consumed = parser.received(data)
1018
            obs.received(data[:consumed], from_client)
1019
            data = data[consumed:]
1020
            if parser.completed:
1021
                new_index = index + 1
1022
                if from_client:
1023
                    self.req_index = new_index
1024
                else:
1025
                    self.resp_index = new_index
1026
1027
1028
#############################################################################
1029
#
1030
# HTTP proxy
1031
#
1032
#############################################################################
1033
1034
1035
class HTTPProxyToServerConnection (ForwardingEndpoint):
1036
    """Ensures that responses to a persistent HTTP connection occur
1037
    in the correct order."""
1038
1039
    finished = 0
1040
1041
    def __init__(self, proxy_conn, dests=()):
1042
        ForwardingEndpoint.__init__(self)
1043
        self.response_parser = HTTPStreamParser(0)
1044
        self.proxy_conn = proxy_conn
1045
        self.direct = 0
1046
        self.set_dests(dests)
1047
1048
        # Data for the client held until previous responses are sent
1049
        self.held = []
1050
1051
    def set_direct(self):
1052
        self.direct = 1
1053
1054
    def handle_connect(self):
1055
        ForwardingEndpoint.handle_connect(self)
1056
        if self.direct:
1057
            # Inject the success reply into the stream
1058
            self.received('HTTP/1.1 200 Connection established\r\n\r\n')
1059
1060
    def _isMyTurn(self):
1061
        """Returns a true value if it's time for this response
1062
        to respond to the client."""
1063
        order = self.proxy_conn._response_order
1064
        if order:
1065
            return (order[0] is self)
1066
        return 1
1067
1068
    def received(self, data):
1069
        """Receives data from the HTTP server to be sent back to the client."""
1070
        if self.direct:
1071
            ForwardingEndpoint.received(self, data)
1072
            self.held.append(data)
1073
            self.flush()
1074
            return
1075
        while 1:
1076
            parser = self.response_parser
1077
            if parser.completed:
1078
                self.finished = 1
1079
                self.close()
1080
                # TODO: it would be nice to reuse proxy connections
1081
                # rather than close them every time.
1082
                return
1083
            if not data:
1084
                break
1085
            consumed = parser.received(data)
1086
            fragment = data[:consumed]
1087
            data = data[consumed:]
1088
            ForwardingEndpoint.received(self, fragment)
1089
            self.held.append(fragment)
1090
            self.flush()
1091
1092
    def flush(self):
1093
        """Flushes buffers and, if the response has been sent, allows
1094
        the next response to take over.
1095
        """
1096
        if self.held and self._isMyTurn():
1097
            data = ''.join(self.held)
1098
            del self.held[:]
1099
            self.proxy_conn.write(data)
1100
        if self.finished:
1101
            order = self.proxy_conn._response_order
1102
            if order and order[0] is self:
1103
                del order[0]
1104
            if order:
1105
                order[0].flush()  # kick!
1106
1107
    def handle_close(self):
1108
        """The HTTP server closed the connection.
1109
        """
1110
        ForwardingEndpoint.handle_close(self)
1111
        if not self.finished:
1112
            # Cancel the proxy connection, even if there are responses
1113
            # pending, since the HTTP spec provides no way to recover
1114
            # from an unfinished response.
1115
            self.proxy_conn.close()
1116
1117
    def close(self):
1118
        """Close the connection to the server.
1119
1120
        If there is unsent response data, an error is generated.
1121
        """
1122
        self.flush()
1123
        if not self.finished and not self.direct:
1124
            t = IOError
1125
            v = 'Closed without finishing response to client'
1126
            for d in self._dests:
1127
                if hasattr(d, 'error'):
1128
                    d.error(t, v)
1129
        ForwardingEndpoint.close(self)
1130
1131
1132
1133
class HTTPProxyToClientConnection (ForwardingEndpoint):
1134
    """A connection from a client to the proxy server"""
1135
1136
    _req_parser = None
1137
    _transaction = 0
1138
    _obs = None
1139
1140
    def __init__(self, conn, factory, counter, addr):
1141
        ForwardingEndpoint.__init__(self, conn)
1142
        self._obs_factory = factory
1143
        self._counter = counter
1144
        self._client_addr = addr
1145
        self._direct_receiver = None
1146
        self._response_order = []
1147
        self._newRequest()
1148
1149
    def _newRequest(self):
1150
        """Starts a new request on a persistent connection."""
1151
        if self._req_parser is None:
1152
            self._req_parser = HTTPStreamParser(1)
1153
        factory = self._obs_factory
1154
        if factory is not None:
1155
            fci = ForwardedConnectionInfo(self._counter, self._client_addr)
1156
            self._transaction = self._transaction + 1
1157
            fci.transaction = self._transaction
1158
            obs = factory(fci)
1159
            self._obs = obs
1160
            self.set_dests((EndpointObserver(obs, 1),))
1161
1162
    def received(self, data):
1163
        """Accepts data received from the client."""
1164
        while data:
1165
            if self._direct_receiver is not None:
1166
                # Direct-connect mode
1167
                self._direct_receiver.write(data)
1168
                ForwardingEndpoint.received(self, data)
1169
                return
1170
            parser = self._req_parser
1171
            if parser is None:
1172
                # Begin another request.
1173
                self._newRequest()
1174
                parser = self._req_parser
1175
            if not parser.completed:
1176
                # Waiting for a complete request.
1177
                consumed = parser.received(data)
1178
                ForwardingEndpoint.received(self, data[:consumed])
1179
                data = data[consumed:]
1180
            if parser.completed:
1181
                # Connect to a server.
1182
                self.openProxyConnection(parser)
1183
                # Expect a new request or a closed connection.
1184
                self._req_parser = None
1185
1186
    def openProxyConnection(self, request):
1187
        """Parses the client connection and opens a connection to an
1188
        HTTP server.
1189
        """
1190
        first_line = request.first_line.strip()
1191
        if not ' ' in first_line:
1192
            raise ValueError, ('Malformed request: %s' % first_line)
1193
        command, url = first_line.split(' ', 1)
1194
        pos = url.rfind(' HTTP/')
1195
        if pos >= 0:
1196
            protocol = url[pos + 1:]
1197
            url = url[:pos].rstrip()
1198
        else:
1199
            protocol = 'HTTP/1.0'
1200
        if url.startswith('http://'):
1201
            # Standard proxy
1202
            urlpart = url[7:]
1203
            if '/' in urlpart:
1204
                host, path = url[7:].split('/', 1)
1205
                path = '/' + path
1206
            else:
1207
                host = urlpart
1208
                path = '/'
1209
        elif '/' not in url:
1210
            # Only a host name (probably using CONNECT)
1211
            host = url
1212
            path = ''
1213
        else:
1214
            # Transparent proxy
1215
            host = request.headers.get('HOST')
1216
            path = url
1217
        if not host:
1218
            raise ValueError, ('Request type not supported: %s' % url)
1219
1220
        if '@' in host:
1221
            username, host = host.split('@')
1222
1223
        if ':' in host:
1224
            host, port = host.split(':', 1)
1225
            port = int(port)
1226
        else:
1227
            port = 80
1228
1229
        obs = self._obs
1230
        if obs is not None:
1231
            eo = EndpointObserver(obs, 0)
1232
            ptos = HTTPProxyToServerConnection(self, (eo,))
1233
        else:
1234
            ptos = HTTPProxyToServerConnection(self)
1235
1236
        self._response_order.append(ptos)
1237
1238
        if command == 'CONNECT':
1239
            # Reply, then send the remainder of the connection
1240
            # directly to the server.
1241
            self._direct_receiver = ptos
1242
            ptos.set_direct()
1243
        else:
1244
            ptos.write('%s %s %s\r\n' % (command, path, protocol))
1245
            # Duplicate the headers sent by the client.
1246
            if request.header:
1247
                ptos.write(request.header)
1248
            else:
1249
                ptos.write('\r\n')
1250
            if request.body_data:
1251
                ptos.write(''.join(request.body_data))
1252
        ptos.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1253
        ptos.connect((host, port))
1254
1255
    def close(self):
1256
        """Closes the connection to the client.
1257
1258
        If there are open connections to proxy servers, the server
1259
        connections are also closed.
1260
        """
1261
        ForwardingEndpoint.close(self)
1262
        for ptos in self._response_order:
1263
            ptos.close()
1264
        del self._response_order[:]
1265
1266
1267
class HTTPProxyService (asyncore.dispatcher):
1268
    """A minimal HTTP proxy server"""
1269
1270
    connection_class = HTTPProxyToClientConnection
1271
1272
    _counter = 0
1273
1274
    def __init__(self, listen_host, listen_port, observer_factory=None):
1275
        self._obs_factory = observer_factory
1276
        asyncore.dispatcher.__init__(self)
1277
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1278
        self.set_reuse_addr()
1279
        self.bind((listen_host, listen_port))
1280
        self.listen(5)
1281
1282
    def handle_accept(self):
1283
        info = self.accept()
1284
        if info:
1285
            # Got a connection.
1286
            conn, addr = info
1287
            conn.setblocking(0)
1288
            counter = self._counter + 1
1289
            self._counter = counter
1290
            self.connection_class(conn, self._obs_factory, counter, addr)
1291
1292
    def handle_error(self):
1293
        # Don't stop the server.
1294
        import traceback
1295
        traceback.print_exc()
1296
1297
1298
#############################################################################
1299
#
1300
# Command-line interface
1301
#
1302
#############################################################################
1303
1304
def usage():
1305
    sys.stderr.write(COPYRIGHT + '\n')
1306
    sys.stderr.write(
1307
        """TCP monitoring and logging tool with support for HTTP 1.1
1308
Simple usage: tcpwatch.py -L listen_port:dest_hostname:dest_port
1309
1310
TCP forwarded connection setup:
1311
  -L <listen_port>:<dest_port>
1312
     Set up a local forwarded connection
1313
  -L <listen_port>:<dest_host>:<dest_port>
1314
     Set up a forwarded connection to a specified host
1315
  -L <listen_host>:<listen_port>:<dest_host>:<dest_port>
1316
     Set up a forwarded connection to a specified host, bound to an interface
1317
1318
HTTP setup:
1319
  -h (or --http) Split forwarded HTTP persistent connections
1320
  -p [<listen_host>:]<listen_port> Run an HTTP proxy
1321
1322
Output options:
1323
  -s   Output to stdout instead of a Tkinter window
1324
  -n   No color in GUI (faster and consumes less RAM)
1325
  -c   Extra color (colorizes escaped characters)
1326
  --cr     Show carriage returns (ASCII 13)
1327
  --help   Show usage information
1328
1329
Recording options:
1330
  -r <path>  (synonyms: -R, --record-directory)
1331
    Write recorded data to <path>.  By default, creates request and
1332
    response files for each request, and writes a corresponding error file
1333
    for any error detected by tcpwatch.
1334
  --record-prefix=<prefix>
1335
    Use <prefix> as the file prefix for logged request / response / error
1336
    files (defaults to 'watch').
1337
  --no-record-responses
1338
    Suppress writing '.response' files.
1339
  --no-record-errors
1340
    Suppress writing '.error' files.
1341
""")
1342
    sys.exit()
1343
1344
1345
def usageError(s):
1346
    sys.stderr.write(str(s) + '\n\n')
1347
    usage()
1348
1349
1350
def main(args):
1351
    global show_cr
1352
1353
    try:
1354
        optlist, extra = getopt.getopt(args, 'chL:np:r:R:s',
1355
                                       ['help', 'http', 'cr',
1356
                                        'record-directory=',
1357
                                        'record-prefix=',
1358
                                        'no-record-responses',
1359
                                        'no-record-errors',
1360
                                       ])
1361
    except getopt.GetoptError, msg:
1362
        usageError(msg)
1363
1364
    fwd_params = []
1365
    proxy_params = []
1366
    obs_factory = None
1367
    show_config = 0
1368
    split_http = 0
1369
    colorized = 1
1370
    record_directory = None
1371
    record_prefix = 'watch'
1372
    record_responses = 1
1373
    record_errors = 1
1374
    recording = {}
1375
1376
    for option, value in optlist:
1377
        if option == '--help':
1378
            usage()
1379
        elif option == '--http' or option == '-h':
1380
            split_http = 1
1381
        elif option == '-n':
1382
            colorized = 0
1383
        elif option == '-c':
1384
            colorized = 2
1385
        elif option == '--cr':
1386
            show_cr = 1
1387
        elif option == '-s':
1388
            show_config = 1
1389
            obs_factory = StdoutObserver
1390
        elif option == '-p':
1391
            # HTTP proxy
1392
            info = value.split(':')
1393
            listen_host = ''
1394
            if len(info) == 1:
1395
                listen_port = int(info[0])
1396
            elif len(info) == 2:
1397
                listen_host = info[0]
1398
                listen_port = int(info[1])
1399
            else:
1400
                usageError('-p requires a port or a host:port parameter')
1401
            proxy_params.append((listen_host, listen_port))
1402
        elif option == '-L':
1403
            # TCP forwarder
1404
            info = value.split(':')
1405
            listen_host = ''
1406
            dest_host = ''
1407
            if len(info) == 2:
1408
                listen_port = int(info[0])
1409
                dest_port = int(info[1])
1410
            elif len(info) == 3:
1411
                listen_port = int(info[0])
1412
                dest_host = info[1]
1413
                dest_port = int(info[2])
1414
            elif len(info) == 4:
1415
                listen_host = info[0]
1416
                listen_port = int(info[1])
1417
                dest_host = info[2]
1418
                dest_port = int(info[3])
1419
            else:
1420
                usageError('-L requires 2, 3, or 4 colon-separated parameters')
1421
            fwd_params.append(
1422
                (listen_host, listen_port, dest_host, dest_port))
1423
        elif (option == '-r'
1424
              or option == '-R'
1425
              or option == '--record-directory'):
1426
            record_directory = value
1427
        elif option == '--record-prefix':
1428
            record_prefix = value
1429
        elif option == '--no-record-responses':
1430
            record_responses = 0
1431
        elif option == '--no-record-errors':
1432
            record_errors = 0
1433
1434
    if not fwd_params and not proxy_params:
1435
        usageError("At least one -L or -p option is required.")
1436
1437
    # Prepare the configuration display.
1438
    config_info_lines = []
1439
    title_lst = []
1440
    if fwd_params:
1441
        config_info_lines.extend(map(
1442
            lambda args: 'Forwarding %s:%d -> %s:%d' % args, fwd_params))
1443
        title_lst.extend(map(
1444
            lambda args: '%s:%d -> %s:%d' % args, fwd_params))
1445
    if proxy_params:
1446
        config_info_lines.extend(map(
1447
            lambda args: 'HTTP proxy listening on %s:%d' % args, proxy_params))
1448
        title_lst.extend(map(
1449
            lambda args: '%s:%d -> proxy' % args, proxy_params))
1450
    if split_http:
1451
        config_info_lines.append('HTTP connection splitting enabled.')
1452
    if record_directory:
1453
        config_info_lines.append(
1454
            'Recording to directory %s.' % record_directory)
1455
    config_info = '\n'.join(config_info_lines)
1456
    titlepart = ', '.join(title_lst)
1457
    mainloop = None
1458
1459
    if obs_factory is None:
1460
        # If no observer factory has been specified, use Tkinter.
1461
        obs_factory, mainloop = setupTk(titlepart, config_info, colorized)
1462
1463
    if record_directory:
1464
        def _decorateRecorder(fci, sub_factory=obs_factory,
1465
                              record_directory=record_directory,
1466
                              record_prefix=record_prefix,
1467
                              record_responses=record_responses,
1468
                              record_errors=record_errors):
1469
            return RecordingObserver(fci, sub_factory, record_directory,
1470
                                     record_prefix, record_responses,
1471
                                     record_errors)
1472
        obs_factory = _decorateRecorder
1473
1474
    chosen_factory = obs_factory
1475
    if split_http:
1476
        # Put an HTTPConnectionSplitter between the events and the output.
1477
        def _factory(fci, sub_factory=obs_factory):
1478
            return HTTPConnectionSplitter(sub_factory, fci)
1479
        chosen_factory = _factory
1480
    # obs_factory is the connection observer factory without HTTP
1481
    # connection splitting, while chosen_factory may have connection
1482
    # splitting.  Proxy services use obs_factory rather than the full
1483
    # chosen_factory because proxy services perform connection
1484
    # splitting internally.
1485
1486
    services = []
1487
    try:
1488
        # Start forwarding services.
1489
        for params in fwd_params:
1490
            args = params + (chosen_factory,)
1491
            s = ForwardingService(*args)
1492
            services.append(s)
1493
1494
        # Start proxy services.
1495
        for params in proxy_params:
1496
            args = params + (obs_factory,)
1497
            s = HTTPProxyService(*args)
1498
            services.append(s)
1499
1500
        if show_config:
1501
            sys.stderr.write(config_info + '\n')
1502
1503
        # Run the main loop.
1504
        try:
1505
            if mainloop is not None:
1506
                import thread
1507
                thread.start_new_thread(asyncore.loop, (), {'timeout': 1.0})
1508
                mainloop()
1509
            else:
1510
                asyncore.loop(timeout=1.0)
1511
        except KeyboardInterrupt:
1512
            sys.stderr.write('TCPWatch finished.\n')
1513
    finally:
1514
        for s in services:
1515
            s.close()
1516
1517
1518
if __name__ == '__main__':
1519
    main(sys.argv[1:])