4
by Aaron Bentley
Initial test. |
1 |
from BaseHTTPServer import ( |
2 |
HTTPServer, |
|
3 |
BaseHTTPRequestHandler, |
|
4 |
)
|
|
6
by Aaron Bentley
Use constants. |
5 |
import httplib |
4
by Aaron Bentley
Initial test. |
6 |
import os |
5
by Aaron Bentley
Actual fake service working. |
7 |
from signal import SIGKILL |
4
by Aaron Bentley
Initial test. |
8 |
from StringIO import StringIO |
3
by Aaron Bentley
Add test framework. |
9 |
from unittest import TestCase |
10 |
||
5
by Aaron Bentley
Actual fake service working. |
11 |
from testtools import ExpectedException |
12 |
||
8
by Aaron Bentley
Test message path. |
13 |
from grackle.client import ( |
14 |
GrackleClient, |
|
15 |
)
|
|
4
by Aaron Bentley
Initial test. |
16 |
|
17 |
||
5
by Aaron Bentley
Actual fake service working. |
18 |
class Forked: |
19 |
||
7
by Aaron Bentley
Fix URLs etc. |
20 |
def __init__(self, func_or_method, *args): |
5
by Aaron Bentley
Actual fake service working. |
21 |
self.func_or_method = func_or_method |
22 |
self.pid = None |
|
7
by Aaron Bentley
Fix URLs etc. |
23 |
self.args = args |
5
by Aaron Bentley
Actual fake service working. |
24 |
|
25 |
def __enter__(self): |
|
26 |
pid = os.fork() |
|
27 |
if pid != 0: |
|
28 |
self.pid = pid |
|
29 |
return
|
|
7
by Aaron Bentley
Fix URLs etc. |
30 |
self.func_or_method(*self.args) |
5
by Aaron Bentley
Actual fake service working. |
31 |
|
32 |
||
33 |
def __exit__(self, exc_type, exc_val, traceback): |
|
34 |
os.kill(self.pid, SIGKILL) |
|
35 |
||
36 |
||
37 |
class FakeGrackleRequestHandler(BaseHTTPRequestHandler): |
|
38 |
||
7
by Aaron Bentley
Fix URLs etc. |
39 |
def do_PUT(self): |
5
by Aaron Bentley
Actual fake service working. |
40 |
message = self.rfile.read(int(self.headers['content-length'])) |
41 |
if message == 'This is a message': |
|
6
by Aaron Bentley
Use constants. |
42 |
self.send_response(httplib.CREATED) |
5
by Aaron Bentley
Actual fake service working. |
43 |
self.end_headers() |
44 |
self.wfile.close() |
|
45 |
else: |
|
6
by Aaron Bentley
Use constants. |
46 |
self.send_error(httplib.BAD_REQUEST) |
5
by Aaron Bentley
Actual fake service working. |
47 |
|
48 |
||
7
by Aaron Bentley
Fix URLs etc. |
49 |
def run_service(port): |
50 |
service = HTTPServer(('', port), FakeGrackleRequestHandler) |
|
5
by Aaron Bentley
Actual fake service working. |
51 |
service.serve_forever() |
52 |
||
4
by Aaron Bentley
Initial test. |
53 |
|
54 |
||
3
by Aaron Bentley
Add test framework. |
55 |
class TestPutMessage(TestCase): |
56 |
||
57 |
def test_put_message(self): |
|
7
by Aaron Bentley
Fix URLs etc. |
58 |
client = GrackleClient('localhost', 8435) |
59 |
with Forked(run_service, client.port): |
|
60 |
client.put_message('arch1', 'asdf', StringIO('This is a message')) |
|
5
by Aaron Bentley
Actual fake service working. |
61 |
with ExpectedException(Exception, 'wtf'): |
7
by Aaron Bentley
Fix URLs etc. |
62 |
client.put_message('arch1', 'asdf', |
63 |
StringIO('This is not a message')) |