8
by William Grant
A little bit of server. |
1 |
# Copyright (c) 2012 Canonical Ltd
|
2 |
#
|
|
3 |
# This program is free software: you can redistribute it and/or modify
|
|
4 |
# it under the terms of the GNU Affero General Public License as published by
|
|
5 |
# the Free Software Foundation, either version 3 of the License, or
|
|
6 |
# (at your option) any later version.
|
|
7 |
#
|
|
8 |
# This program is distributed in the hope that it will be useful,
|
|
9 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
10 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
11 |
# GNU Affero General Public License for more details.
|
|
12 |
#
|
|
13 |
# You should have received a copy of the GNU Affero General Public
|
|
14 |
# License along with this program. If not, see
|
|
15 |
# <http://www.gnu.org/licenses/>.
|
|
16 |
||
17 |
import datetime |
|
9
by William Grant
Extract some message metadata. Log betterer. |
18 |
import dateutil.tz |
19 |
import email.parser |
|
20 |
from email.utils import parsedate_tz |
|
21 |
import logging |
|
8
by William Grant
A little bit of server. |
22 |
import uuid |
23 |
||
24 |
import pycassa |
|
25 |
from pycassa.system_manager import ( |
|
26 |
LEXICAL_UUID_TYPE, |
|
27 |
SystemManager, |
|
28 |
TIME_UUID_TYPE, |
|
29 |
)
|
|
30 |
||
31 |
from grackle.server.cassandra import workaround_1779 |
|
32 |
||
33 |
||
34 |
def create_schema(host, keyspace, clobber=False): |
|
35 |
mgr = SystemManager(host) |
|
36 |
||
37 |
if clobber: |
|
38 |
for cf in mgr.get_keyspace_column_families(keyspace): |
|
39 |
mgr.drop_column_family(keyspace, cf) |
|
40 |
||
41 |
try: |
|
42 |
workaround_1779( |
|
43 |
mgr.create_column_family, keyspace, 'message', |
|
44 |
key_validation_class=LEXICAL_UUID_TYPE) |
|
45 |
workaround_1779( |
|
46 |
mgr.create_column_family, keyspace, 'archive_message', |
|
47 |
comparator_type=TIME_UUID_TYPE, |
|
48 |
default_validation_class=LEXICAL_UUID_TYPE) |
|
49 |
pass
|
|
50 |
finally: |
|
51 |
mgr.close() |
|
52 |
||
53 |
||
54 |
class CassandraConnection(object): |
|
55 |
||
56 |
def __init__(self, keyspace, host): |
|
57 |
self._keyspace = keyspace |
|
58 |
self._host = host |
|
59 |
self._connection = self._connect() |
|
60 |
self.messages = self._column_family('message') |
|
61 |
self.archive_messages = self._column_family('archive_message') |
|
62 |
||
63 |
def _connect(self): |
|
64 |
return pycassa.connect(self._keyspace, self._host) |
|
65 |
||
66 |
def _column_family(self, name): |
|
67 |
return pycassa.ColumnFamily(self._connection, name) |
|
68 |
||
14
by William Grant
Refactor. |
69 |
def _parse_message(self, message): |
70 |
"""Get a date and dict of an RFC822 message."""
|
|
9
by William Grant
Extract some message metadata. Log betterer. |
71 |
parsed = email.parser.Parser().parsestr(message) |
14
by William Grant
Refactor. |
72 |
message_dict = {} |
73 |
message_dict['content'] = message |
|
74 |
||
75 |
for key in ('from', 'to', 'subject', 'message-id'): |
|
76 |
value = parsed.get(key, None) |
|
77 |
if value is not None: |
|
78 |
message_dict[key] = value |
|
79 |
||
9
by William Grant
Extract some message metadata. Log betterer. |
80 |
date = parsed.get('date') |
81 |
if date is not None: |
|
82 |
try: |
|
83 |
pdate = parsedate_tz(date) |
|
84 |
date = datetime.datetime( |
|
85 |
*pdate[:6], |
|
86 |
tzinfo=dateutil.tz.tzoffset('', pdate[9])) |
|
87 |
except ValueError: |
|
88 |
pass
|
|
14
by William Grant
Refactor. |
89 |
message_dict['date'] = date.isoformat() if date is not None else None |
90 |
||
91 |
return date, message_dict |
|
92 |
||
93 |
def add_message(self, archive_uuid, message): |
|
94 |
message_uuid = uuid.uuid4() |
|
95 |
message_date, message_dict = self._parse_message(message) |
|
15
by William Grant
date_created is no longer part of the original message dict. |
96 |
message_dict['date_created'] = ( |
97 |
datetime.datetime.utcnow().isoformat() + 'Z') |
|
14
by William Grant
Refactor. |
98 |
self.messages.insert(message_uuid, message_dict) |
8
by William Grant
A little bit of server. |
99 |
self.archive_messages.insert( |
9
by William Grant
Extract some message metadata. Log betterer. |
100 |
archive_uuid, |
14
by William Grant
Refactor. |
101 |
{message_date.astimezone(dateutil.tz.tzutc()): message_uuid}) |
9
by William Grant
Extract some message metadata. Log betterer. |
102 |
logging.debug( |
14
by William Grant
Refactor. |
103 |
'Imported %s into %s' |
104 |
% (message_dict.get('message-id', None), archive_uuid)) |
|
8
by William Grant
A little bit of server. |
105 |
return message_uuid |
106 |
||
9
by William Grant
Extract some message metadata. Log betterer. |
107 |
def _format_message(self, message): |
108 |
return { |
|
14
by William Grant
Refactor. |
109 |
'date': message.get('date'), |
110 |
'from': message.get('from'), |
|
111 |
'subject': message.get('subject'), |
|
9
by William Grant
Extract some message metadata. Log betterer. |
112 |
}
|
113 |
||
11
by William Grant
Support batching. |
114 |
def get_messages(self, archive_uuid, order, count, start): |
10
by William Grant
Let the HTTP client decide count and order (to an extent). |
115 |
if order in ("date", "-date"): |
116 |
reversed = order[0] == '-' |
|
117 |
else: |
|
118 |
raise AssertionError("Unsupported order.") |
|
11
by William Grant
Support batching. |
119 |
pairs = self.archive_messages.get( |
12
by William Grant
Parse out To, and lint. |
120 |
archive_uuid, column_count=count + 1, |
11
by William Grant
Support batching. |
121 |
column_start=start, column_reversed=reversed).items() |
122 |
ids = [v for k, v in pairs] |
|
123 |
messages = self.messages.multiget( |
|
124 |
ids, columns=['date', 'from', 'subject']) |
|
125 |
actual_count = len(pairs) |
|
126 |
if len(pairs) > count: |
|
127 |
assert len(pairs) == count + 1 |
|
128 |
actual_count -= 1 |
|
129 |
next_memo = str(pairs[count][0]) |
|
130 |
else: |
|
131 |
next_memo = None |
|
132 |
return ( |
|
133 |
[self._format_message(messages[id]) for id in ids[:actual_count]], |
|
134 |
next_memo, |
|
135 |
)
|