~didrocks/unity/altf10

4 by Aaron Bentley
Initial test.
1
from BaseHTTPServer import (
2
    HTTPServer,
3
    BaseHTTPRequestHandler,
4
    )
6 by Aaron Bentley
Use constants.
5
import httplib
4 by Aaron Bentley
Initial test.
6
import os
5 by Aaron Bentley
Actual fake service working.
7
from signal import SIGKILL
4 by Aaron Bentley
Initial test.
8
from StringIO import StringIO
3 by Aaron Bentley
Add test framework.
9
from unittest import TestCase
10
5 by Aaron Bentley
Actual fake service working.
11
from testtools import ExpectedException
12
7 by Aaron Bentley
Fix URLs etc.
13
from grackle.client import GrackleClient
4 by Aaron Bentley
Initial test.
14
15
5 by Aaron Bentley
Actual fake service working.
16
class Forked:
17
7 by Aaron Bentley
Fix URLs etc.
18
    def __init__(self, func_or_method, *args):
5 by Aaron Bentley
Actual fake service working.
19
        self.func_or_method = func_or_method
20
        self.pid = None
7 by Aaron Bentley
Fix URLs etc.
21
        self.args = args
5 by Aaron Bentley
Actual fake service working.
22
23
    def __enter__(self):
24
        pid = os.fork()
25
        if pid != 0:
26
            self.pid = pid
27
            return
7 by Aaron Bentley
Fix URLs etc.
28
        self.func_or_method(*self.args)
5 by Aaron Bentley
Actual fake service working.
29
30
31
    def __exit__(self, exc_type, exc_val, traceback):
32
        os.kill(self.pid, SIGKILL)
33
34
35
class FakeGrackleRequestHandler(BaseHTTPRequestHandler):
36
7 by Aaron Bentley
Fix URLs etc.
37
    def do_PUT(self):
5 by Aaron Bentley
Actual fake service working.
38
        message = self.rfile.read(int(self.headers['content-length']))
39
        if message == 'This is a message':
6 by Aaron Bentley
Use constants.
40
            self.send_response(httplib.CREATED)
5 by Aaron Bentley
Actual fake service working.
41
            self.end_headers()
42
            self.wfile.close()
43
        else:
6 by Aaron Bentley
Use constants.
44
            self.send_error(httplib.BAD_REQUEST)
5 by Aaron Bentley
Actual fake service working.
45
46
7 by Aaron Bentley
Fix URLs etc.
47
def run_service(port):
48
    service = HTTPServer(('', port), FakeGrackleRequestHandler)
5 by Aaron Bentley
Actual fake service working.
49
    service.serve_forever()
50
4 by Aaron Bentley
Initial test.
51
52
3 by Aaron Bentley
Add test framework.
53
class TestPutMessage(TestCase):
54
55
    def test_put_message(self):
7 by Aaron Bentley
Fix URLs etc.
56
        client = GrackleClient('localhost', 8435)
57
        with Forked(run_service, client.port):
58
            client.put_message('arch1', 'asdf', StringIO('This is a message'))
5 by Aaron Bentley
Actual fake service working.
59
            with ExpectedException(Exception, 'wtf'):
7 by Aaron Bentley
Fix URLs etc.
60
                client.put_message('arch1', 'asdf',
61
                    StringIO('This is not a message'))