8
by William Grant
A little bit of server. |
1 |
# Copyright (c) 2012 Canonical Ltd
|
2 |
#
|
|
3 |
# This program is free software: you can redistribute it and/or modify
|
|
4 |
# it under the terms of the GNU Affero General Public License as published by
|
|
5 |
# the Free Software Foundation, either version 3 of the License, or
|
|
6 |
# (at your option) any later version.
|
|
7 |
#
|
|
8 |
# This program is distributed in the hope that it will be useful,
|
|
9 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
10 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
11 |
# GNU Affero General Public License for more details.
|
|
12 |
#
|
|
13 |
# You should have received a copy of the GNU Affero General Public
|
|
14 |
# License along with this program. If not, see
|
|
15 |
# <http://www.gnu.org/licenses/>.
|
|
16 |
||
41
by William Grant
So it turns out that calendar.timegm is a UTC equivalent of time.mktime. |
17 |
import calendar |
8
by William Grant
A little bit of server. |
18 |
import datetime |
9
by William Grant
Extract some message metadata. Log betterer. |
19 |
import email.parser |
20 |
from email.utils import parsedate_tz |
|
37
by William Grant
Some flexibility in formatting. |
21 |
import functools |
9
by William Grant
Extract some message metadata. Log betterer. |
22 |
import logging |
8
by William Grant
A little bit of server. |
23 |
import uuid |
24 |
||
43
by William Grant
Fix dateutil import position. |
25 |
import dateutil.tz |
42
by William Grant
Use a ConnectionPool. pycassa.connect is gone in 1.4.0. |
26 |
import pycassa.pool |
8
by William Grant
A little bit of server. |
27 |
from pycassa.system_manager import ( |
28 |
LEXICAL_UUID_TYPE, |
|
29 |
SystemManager, |
|
30 |
TIME_UUID_TYPE, |
|
31 |
)
|
|
32
by William Grant
Date filtering. |
32 |
from pycassa.util import convert_time_to_uuid |
8
by William Grant
A little bit of server. |
33 |
|
21
by William Grant
Merge grackle.server into grackle. Alter Makefile to run all the tests. |
34 |
from grackle.cassandra import workaround_1779 |
8
by William Grant
A little bit of server. |
35 |
|
36 |
||
19
by William Grant
grackle-create-instance can now create the keyspace too. |
37 |
def create_schema(host, keyspace, clobber=False, create_keyspace=False): |
8
by William Grant
A little bit of server. |
38 |
mgr = SystemManager(host) |
39 |
||
19
by William Grant
grackle-create-instance can now create the keyspace too. |
40 |
if create_keyspace: |
41 |
mgr.create_keyspace(keyspace, replication_factor=1) |
|
42 |
||
8
by William Grant
A little bit of server. |
43 |
if clobber: |
44 |
for cf in mgr.get_keyspace_column_families(keyspace): |
|
45 |
mgr.drop_column_family(keyspace, cf) |
|
46 |
||
47 |
try: |
|
48 |
workaround_1779( |
|
49 |
mgr.create_column_family, keyspace, 'message', |
|
50 |
key_validation_class=LEXICAL_UUID_TYPE) |
|
51 |
workaround_1779( |
|
52 |
mgr.create_column_family, keyspace, 'archive_message', |
|
53 |
comparator_type=TIME_UUID_TYPE, |
|
54 |
default_validation_class=LEXICAL_UUID_TYPE) |
|
55 |
pass
|
|
56 |
finally: |
|
57 |
mgr.close() |
|
58 |
||
59 |
||
17
by William Grant
Turn _parse_message into a non-member function. |
60 |
def _parse_message(message): |
61 |
"""Get a date and dict of an RFC822 message."""
|
|
62 |
parsed = email.parser.Parser().parsestr(message) |
|
63 |
message_dict = {} |
|
64 |
||
65 |
for key in ('from', 'to', 'subject', 'message-id'): |
|
66 |
value = parsed.get(key, None) |
|
67 |
if value is not None: |
|
68 |
message_dict[key] = value |
|
69 |
||
70 |
date = parsed.get('date') |
|
71 |
if date is not None: |
|
72 |
try: |
|
73 |
pdate = parsedate_tz(date) |
|
74 |
date = datetime.datetime( |
|
75 |
*pdate[:6], |
|
76 |
tzinfo=dateutil.tz.tzoffset('', pdate[9])) |
|
77 |
except ValueError: |
|
78 |
pass
|
|
79 |
message_dict['date'] = date.isoformat() if date is not None else None |
|
80 |
||
81 |
return date, message_dict |
|
82 |
||
83 |
||
31
by William Grant
Use real UTC timestamps in the archive_message TimeUUIDs. pycassa's end up including the local timezone offset :/ |
84 |
def _utc_datetime(dt): |
85 |
return dt.astimezone(dateutil.tz.tzutc()) |
|
86 |
||
87 |
||
88 |
def _utc_timestamp(dt): |
|
41
by William Grant
So it turns out that calendar.timegm is a UTC equivalent of time.mktime. |
89 |
return calendar.timegm(_utc_datetime(dt).timetuple()) |
31
by William Grant
Use real UTC timestamps in the archive_message TimeUUIDs. pycassa's end up including the local timezone offset :/ |
90 |
|
91 |
||
32
by William Grant
Date filtering. |
92 |
def _utc_timeuuid(dt, lowest_val=True): |
93 |
return convert_time_to_uuid(_utc_timestamp(dt), lowest_val) |
|
94 |
||
95 |
||
96 |
def _cmp_timeuuid(a, b): |
|
97 |
if a.time != b.time: |
|
98 |
return cmp(a.time, b.time) |
|
99 |
return cmp(a, b) |
|
100 |
||
101 |
||
102 |
def _bound_timeuuid(a, b, max=False): |
|
103 |
if a == '' or _cmp_timeuuid(b, a) == (1 if max else -1): |
|
104 |
return b |
|
105 |
return a |
|
106 |
||
107 |
||
34
by William Grant
Extract _make_bounds. |
108 |
def _make_bounds(memo, range_start, range_finish, backward): |
109 |
start = finish = '' |
|
110 |
if memo != '': |
|
111 |
memo = uuid.UUID(memo) |
|
112 |
if backward: |
|
113 |
finish = memo |
|
114 |
else: |
|
115 |
start = memo |
|
116 |
if range_start is not None: |
|
117 |
start = _bound_timeuuid( |
|
118 |
start, _utc_timeuuid(range_start), max=True) |
|
119 |
if range_finish is not None: |
|
120 |
finish = _bound_timeuuid( |
|
121 |
finish, _utc_timeuuid(range_finish, lowest_val=False)) |
|
122 |
return memo, start, finish |
|
123 |
||
124 |
||
37
by William Grant
Some flexibility in formatting. |
125 |
LEGAL_HEADERS = set([ |
126 |
'date', 'from', 'subject', 'message-id', |
|
127 |
])
|
|
128 |
||
129 |
||
39
by William Grant
Refactor formatters, push headers into a subdict. |
130 |
def _format_message(message, headers=[], include_raw=False): |
37
by William Grant
Some flexibility in formatting. |
131 |
data = {} |
39
by William Grant
Refactor formatters, push headers into a subdict. |
132 |
|
133 |
if headers: |
|
134 |
assert not set(headers).difference(LEGAL_HEADERS) |
|
135 |
hdict = {} |
|
136 |
for header in headers: |
|
137 |
hdict[header] = message.get(header) |
|
138 |
data['headers'] = hdict |
|
139 |
||
140 |
if include_raw: |
|
40
by William Grant
content -> raw |
141 |
data['raw'] = message['raw'] |
39
by William Grant
Refactor formatters, push headers into a subdict. |
142 |
|
37
by William Grant
Some flexibility in formatting. |
143 |
return data |
144 |
||
145 |
||
8
by William Grant
A little bit of server. |
146 |
class CassandraConnection(object): |
147 |
||
42
by William Grant
Use a ConnectionPool. pycassa.connect is gone in 1.4.0. |
148 |
def __init__(self, keyspace, hosts): |
8
by William Grant
A little bit of server. |
149 |
self._keyspace = keyspace |
42
by William Grant
Use a ConnectionPool. pycassa.connect is gone in 1.4.0. |
150 |
self._hosts = hosts |
8
by William Grant
A little bit of server. |
151 |
self._connection = self._connect() |
42
by William Grant
Use a ConnectionPool. pycassa.connect is gone in 1.4.0. |
152 |
self._pool = self._connect() |
8
by William Grant
A little bit of server. |
153 |
self.messages = self._column_family('message') |
154 |
self.archive_messages = self._column_family('archive_message') |
|
155 |
||
156 |
def _connect(self): |
|
42
by William Grant
Use a ConnectionPool. pycassa.connect is gone in 1.4.0. |
157 |
return pycassa.pool.ConnectionPool(self._keyspace, self._hosts) |
8
by William Grant
A little bit of server. |
158 |
|
159 |
def _column_family(self, name): |
|
42
by William Grant
Use a ConnectionPool. pycassa.connect is gone in 1.4.0. |
160 |
return pycassa.ColumnFamily(self._pool, name) |
8
by William Grant
A little bit of server. |
161 |
|
14
by William Grant
Refactor. |
162 |
def add_message(self, archive_uuid, message): |
163 |
message_uuid = uuid.uuid4() |
|
17
by William Grant
Turn _parse_message into a non-member function. |
164 |
message_date, message_dict = _parse_message(message) |
40
by William Grant
content -> raw |
165 |
message_dict['raw'] = message |
15
by William Grant
date_created is no longer part of the original message dict. |
166 |
message_dict['date_created'] = ( |
167 |
datetime.datetime.utcnow().isoformat() + 'Z') |
|
14
by William Grant
Refactor. |
168 |
self.messages.insert(message_uuid, message_dict) |
8
by William Grant
A little bit of server. |
169 |
self.archive_messages.insert( |
9
by William Grant
Extract some message metadata. Log betterer. |
170 |
archive_uuid, |
31
by William Grant
Use real UTC timestamps in the archive_message TimeUUIDs. pycassa's end up including the local timezone offset :/ |
171 |
{_utc_timestamp(message_date): message_uuid}) |
9
by William Grant
Extract some message metadata. Log betterer. |
172 |
logging.debug( |
14
by William Grant
Refactor. |
173 |
'Imported %s into %s' |
174 |
% (message_dict.get('message-id', None), archive_uuid)) |
|
8
by William Grant
A little bit of server. |
175 |
return message_uuid |
176 |
||
29
by William Grant
Do backward correctly. |
177 |
def _trim(self, sequence, end): |
30
by William Grant
Comment and document. |
178 |
"""Return the sequence with one of the ends trimmed.
|
179 |
||
180 |
:param end: if true, remove the last element. otherwise remove
|
|
181 |
the first.
|
|
182 |
"""
|
|
29
by William Grant
Do backward correctly. |
183 |
if end: |
184 |
return sequence[:-1] |
|
185 |
else: |
|
186 |
return sequence[1:] |
|
187 |
||
32
by William Grant
Date filtering. |
188 |
def get_messages(self, archive_uuid, order, count, memo, backward=False, |
37
by William Grant
Some flexibility in formatting. |
189 |
start_date=None, finish_date=None, format='all', |
190 |
headers=['from', 'date', 'subject', 'message-id']): |
|
10
by William Grant
Let the HTTP client decide count and order (to an extent). |
191 |
if order in ("date", "-date"): |
192 |
reversed = order[0] == '-' |
|
193 |
else: |
|
194 |
raise AssertionError("Unsupported order.") |
|
34
by William Grant
Extract _make_bounds. |
195 |
|
196 |
memo, start, finish = _make_bounds( |
|
197 |
memo, start_date, finish_date, backward) |
|
30
by William Grant
Comment and document. |
198 |
|
199 |
# Get up to n+1 messages from the memo: the last item of the
|
|
200 |
# previous batch (because that's where the memo starts) + this
|
|
201 |
# batch.
|
|
11
by William Grant
Support batching. |
202 |
pairs = self.archive_messages.get( |
29
by William Grant
Do backward correctly. |
203 |
archive_uuid, column_count=count + 1, column_start=start, |
204 |
column_finish=finish, column_reversed=reversed).items() |
|
30
by William Grant
Comment and document. |
205 |
|
206 |
if len(pairs) and memo and pairs[0][0] <= memo: |
|
207 |
# The memo (from the previous batch) was included in the result.
|
|
208 |
# Trim it.
|
|
29
by William Grant
Do backward correctly. |
209 |
pairs = self._trim(pairs, False ^ backward) |
27
by William Grant
Fix batching to almost work backwards too. |
210 |
elif len(pairs) > count: |
30
by William Grant
Comment and document. |
211 |
# There was no memo in the result, so the n+1th element is
|
212 |
# unnecessary. Kill it.
|
|
29
by William Grant
Do backward correctly. |
213 |
pairs = self._trim(pairs, True ^ backward) |
27
by William Grant
Fix batching to almost work backwards too. |
214 |
|
215 |
if len(pairs) == 0: |
|
216 |
return (None, [], None) |
|
217 |
||
218 |
assert 0 < len(pairs) <= count |
|
219 |
||
30
by William Grant
Comment and document. |
220 |
# We've narrowed down the message references. Fetch the messages.
|
11
by William Grant
Support batching. |
221 |
ids = [v for k, v in pairs] |
39
by William Grant
Refactor formatters, push headers into a subdict. |
222 |
formatter = functools.partial( |
223 |
_format_message, headers=headers, include_raw=True) |
|
38
by William Grant
unrestrict fetched columns for now. |
224 |
# XXX: No need to get all columns. Restrict based on format.
|
225 |
messages = self.messages.multiget(ids) |
|
27
by William Grant
Fix batching to almost work backwards too. |
226 |
|
11
by William Grant
Support batching. |
227 |
return ( |
27
by William Grant
Fix batching to almost work backwards too. |
228 |
str(pairs[0][0]), |
39
by William Grant
Refactor formatters, push headers into a subdict. |
229 |
[formatter(messages[id]) for id in ids], |
27
by William Grant
Fix batching to almost work backwards too. |
230 |
str(pairs[-1][0]), |
11
by William Grant
Support batching. |
231 |
)
|