34
36
return threads.values()
40
"""A memory-backed message store."""
42
def __init__(self, messages):
44
self.messages = messages
46
def get_messages(self, archive_id, query_string):
47
"""Return matching messages.
49
:param archive_id: The archive to retrieve from.
50
:param query_string: Contains 'parameters', which is a JSON-format
51
string describing parameters.
53
query = parse_qs(query_string)
54
parameters = simplejson.loads(query['parameters'][0])
55
order = parameters.get('order')
56
messages = self.messages[archive_id]
57
if order is not None :
58
if order not in SUPPORTED_ORDERS:
59
raise UnsupportedOrder
60
elif order.startswith('thread_'):
61
threaded = threaded_messages(messages)
63
if order == 'thread_subject':
64
threaded.sort(key=lambda t: t[0]['subject'])
65
if order == 'thread_oldest':
66
threaded.sort(key=lambda t: min(m['date'] for m in t))
67
if order == 'thread_newest':
68
threaded.sort(key=lambda t: max(m['date'] for m in t))
69
for thread in threaded:
70
messages.extend(thread)
72
messages.sort(key=lambda m: m[order])
74
for message in messages:
76
not parameters['include_hidden']
77
and message.get('hidden', False)):
80
if ('message_ids' in parameters and
81
message['message_id'] not in parameters['message_ids']):
83
message = dict(message)
84
if 'headers' in parameters:
86
(k, v) for k, v in message['headers'].iteritems()
87
if k in parameters['headers'])
88
message['headers'] = headers
89
max_body = parameters.get('max_body_length')
90
if max_body is not None:
91
message['body'] = message['body'][:max_body]
92
new_messages.append(message)
93
messages = new_messages
94
limit = parameters.get('limit', 100)
95
memo = parameters.get('memo')
96
message_id_indices = dict(
97
(m['message_id'], idx) for idx, m in enumerate(messages))
101
start = message_id_indices[memo.encode('rot13')]
103
previous_memo = messages[start - 1]['message_id'].encode('rot13')
106
end = min(start + limit, len(messages))
107
if end < len(messages):
108
next_memo = messages[end]['message_id'].encode('rot13')
111
messages = messages[start:end]
114
'messages': messages,
115
'next_memo': next_memo,
116
'previous_memo': previous_memo
123
"""A Grackle service fake, as a ContextManager."""
39
def __init__(self, port, messages=None):
125
def __init__(self, port, messages=None, write_logs=False):
127
:param port: The tcp port to use
128
:param messages: A dict of lists of dicts representing messages. The
129
outer dict represents the archive, the list represents the list of
130
messages for that archive.
131
:param write_logs: If true, log messages will be written to stdout.
42
self.messages = messages
138
self.messages = messages
43
139
self.read_end, self.write_end = os.pipe()
140
self.write_logs = write_logs
143
def from_client(client, messages=None):
144
"""Instantiate a ForkedFake from the client.
146
:param port: The client to provide service for.
147
:param messages: A dict of lists of dicts representing messages. The
148
outer dict represents the archive, the list represents the list of
149
messages for that archive.
151
return ForkedFake(client.port, messages)
45
153
def is_ready(self):
154
"""Tell the parent process that the server is ready for writes."""
46
155
os.write(self.write_end, 'asdf')
48
157
def __enter__(self):
160
Fork and start a server in the child. Return when the server is ready
51
164
self.start_server()
56
169
def start_server(self):
170
"""Start the HTTP server."""
57
171
service = HTTPServer(('', self.port), FakeGrackleRequestHandler)
58
service.messages = self.messages
59
for archive_id, messages in service.messages.iteritems():
172
service.store = GrackleStore(self.messages)
173
for archive_id, messages in service.store.messages.iteritems():
60
174
for message in messages:
61
175
message.setdefault('headers', {})
179
stream=sys.stderr, level=logging.INFO)
63
180
service.serve_forever()
65
182
def __exit__(self, exc_type, exc_val, traceback):
74
191
class FakeGrackleRequestHandler(BaseHTTPRequestHandler):
192
"""A request handler that forwards to server.store."""
194
def __init__(self, *args, **kwargs):
195
"""Constructor. Sets up logging."""
196
self.logger = logging.getLogger('http')
197
BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
76
199
def do_POST(self):
200
"""Create a message on POST."""
77
201
message = self.rfile.read(int(self.headers['content-length']))
78
202
if message == 'This is a message':
79
203
self.send_response(httplib.CREATED)
83
207
self.send_error(httplib.BAD_REQUEST)
210
"""Retrieve a list of messages on GET."""
86
211
scheme, netloc, path, params, query_string, fragments = (
87
212
urlparse(self.path))
88
213
parts = path.split('/')
89
214
if parts[1] == 'archive':
90
self.get_messages(parts[2], query_string)
92
def get_messages(self, archive_id, query_string):
93
query = parse_qs(query_string)
94
parameters = simplejson.loads(query['parameters'][0])
95
order = parameters.get('order')
96
messages = self.server.messages[archive_id]
97
if order is not None :
98
if order not in SUPPORTED_ORDERS:
216
response = self.server.store.get_messages(
217
parts[2], query_string)
218
self.send_response(httplib.OK)
220
self.wfile.write(simplejson.dumps(response))
221
except UnsupportedOrder:
99
222
self.send_response(httplib.BAD_REQUEST)
100
223
self.wfile.write('Unsupported order')
102
elif order.startswith('thread_'):
103
threaded = threaded_messages(messages)
105
if order == 'thread_subject':
106
threaded.sort(key=lambda t: t[0]['subject'])
107
if order == 'thread_oldest':
108
threaded.sort(key=lambda t: min(m['date'] for m in t))
109
if order == 'thread_newest':
110
threaded.sort(key=lambda t: max(m['date'] for m in t))
111
for thread in threaded:
112
messages.extend(thread)
114
messages.sort(key=lambda m: m[order])
115
messages = [m for m in messages
116
if 'message_ids' not in parameters or
117
m['message_id'] in parameters['message_ids']]
118
self.send_response(httplib.OK)
120
limit = parameters.get('limit', 100)
121
memo = parameters.get('memo')
122
message_id_indices = dict(
123
(m['message_id'], idx) for idx, m in enumerate(messages))
127
start = message_id_indices[memo.encode('rot13')]
129
previous_memo = messages[start - 1]['message_id'].encode('rot13')
132
end = min(start + limit, len(messages))
133
if end < len(messages):
134
next_memo = messages[end]['message_id'].encode('rot13')
137
messages = messages[start:end]
139
for message in messages:
140
message = dict(message)
141
if 'headers' in parameters:
143
(k, v) for k, v in message['headers'].iteritems()
144
if k in parameters['headers'])
145
message['headers'] = headers
146
new_messages.append(message)
148
'messages': new_messages,
149
'next_memo': next_memo,
150
'previous_memo': previous_memo
152
self.wfile.write(simplejson.dumps(response))
155
def fake_grackle_service(client, messages=None):
158
return ForkedFake(client.port, messages)
226
def log_message(self, format, *args):
227
"""Override log_message to use standard Python logging."""
228
message = "%s - - [%s] %s\n" % (
229
self.address_string(), self.log_date_time_string(), format%args)
230
self.logger.info(message)
161
233
class TestPutMessage(TestCase):
163
235
def test_put_message(self):
164
236
client = GrackleClient('localhost', 8436)
165
with fake_grackle_service(client):
237
with ForkedFake.from_client(client):
166
238
client.put_message('arch1', 'asdf', StringIO('This is a message'))
167
239
with ExpectedException(Exception, 'wtf'):
168
240
client.put_message('arch1', 'asdf',
237
309
def test_get_messages_thread_subject_order(self):
238
310
client = GrackleClient('localhost', 8439)
239
with fake_grackle_service(client, {'baz': [
311
with ForkedFake.from_client(client, {'baz': [
240
312
{'message_id': 'bar', 'subject': 'y'},
241
313
{'message_id': 'qux', 'subject': 'z'},
242
314
{'message_id': 'foo', 'subject': 'x', 'in_reply_to': 'qux'},
251
323
def test_get_messages_thread_oldest_order(self):
252
324
client = GrackleClient('localhost', 8439)
253
with fake_grackle_service(client, {'baz': [
325
with ForkedFake.from_client(client, {'baz': [
254
326
{'message_id': 'bar', 'date': 'x'},
255
327
{'message_id': 'qux', 'date': 'z'},
256
328
{'message_id': 'foo', 'date': 'y', 'in_reply_to': 'qux'},
265
337
def test_get_messages_thread_newest_order(self):
266
338
client = GrackleClient('localhost', 8439)
267
with fake_grackle_service(client, {'baz': [
339
with ForkedFake.from_client(client, {'baz': [
268
340
{'message_id': 'bar', 'date': 'x'},
269
341
{'message_id': 'qux', 'date': 'w'},
270
342
{'message_id': 'foo', 'date': 'y', 'in_reply_to': 'bar'},
280
352
def test_get_messages_unsupported_order(self):
281
353
client = GrackleClient('localhost', 8439)
282
with fake_grackle_service(client,
354
with ForkedFake.from_client(client,
283
355
{'baz': [{'message_id': 'foo', 'date': '2011-03-25'},
284
356
{'message_id': 'bar', 'date': '2011-03-24'}]}):
285
with ExpectedException(UnsupportedOrder):
357
with ExpectedException(UnsupportedOrder, ''):
286
358
client.get_messages('baz', order='nonsense')
288
360
def test_get_messages_headers_no_headers(self):
289
361
client = GrackleClient('localhost', 8440)
290
with fake_grackle_service(client,
362
with ForkedFake.from_client(client,
292
364
{'message_id': 'foo'}
320
392
first_message = response['messages'][0]
321
393
self.assertEqual('foo', first_message['message_id'])
322
394
self.assertEqual({'From': 'me', 'To': 'you'}, first_message['headers'])
396
def test_get_messages_max_body_length(self):
397
client = GrackleClient('localhost', 8443)
398
with ForkedFake.from_client(client,
400
{'message_id': 'foo', 'body': u'abcdefghi'}
402
response = client.get_messages('baz', max_body_length=3)
403
first_message = response['messages'][0]
404
self.assertEqual('abc', first_message['body'])
406
def test_include_hidden(self):
407
client = GrackleClient('localhost', 8444)
408
with ForkedFake.from_client(client,
410
{'message_id': 'foo', 'hidden': True},
411
{'message_id': 'bar', 'hidden': False}
413
response = client.get_messages('baz', include_hidden=True)
414
self.assertMessageIDs(['bar', 'foo'], response['messages'])
415
response = client.get_messages('baz', include_hidden=False)
416
self.assertMessageIDs(['bar'], response['messages'])