79
78
return date, message_dict
82
def _utc_datetime(dt):
83
return dt.astimezone(dateutil.tz.tzutc())
86
def _utc_timestamp(dt):
87
return time.mktime(_utc_datetime(dt).timetuple()) - time.timezone
90
81
class CassandraConnection(object):
92
83
def __init__(self, keyspace, host):
111
102
self.messages.insert(message_uuid, message_dict)
112
103
self.archive_messages.insert(
114
{_utc_timestamp(message_date): message_uuid})
105
{message_date.astimezone(dateutil.tz.tzutc()): message_uuid})
116
107
'Imported %s into %s'
117
108
% (message_dict.get('message-id', None), archive_uuid))
125
116
'message-id': message.get('message-id'),
128
def _trim(self, sequence, end):
129
"""Return the sequence with one of the ends trimmed.
131
:param end: if true, remove the last element. otherwise remove
139
def get_messages(self, archive_uuid, order, count, memo, backward=False):
119
def get_messages(self, archive_uuid, order, count, memo):
140
120
if order in ("date", "-date"):
141
121
reversed = order[0] == '-'
143
123
raise AssertionError("Unsupported order.")
145
125
memo = uuid.UUID(memo)
153
126
# Get up to n+1 messages from the memo: the last item of the
154
127
# previous batch (because that's where the memo starts) + this
156
129
pairs = self.archive_messages.get(
157
archive_uuid, column_count=count + 1, column_start=start,
158
column_finish=finish, column_reversed=reversed).items()
130
archive_uuid, column_count=count + 1,
131
column_start=memo, column_reversed=reversed).items()
160
if len(pairs) and memo and pairs[0][0] <= memo:
161
# The memo (from the previous batch) was included in the result.
163
pairs = self._trim(pairs, False ^ backward)
133
if memo and len(pairs) and pairs[0][0] <= memo:
164
135
elif len(pairs) > count:
165
# There was no memo in the result, so the n+1th element is
166
# unnecessary. Kill it.
167
pairs = self._trim(pairs, True ^ backward)
169
138
if len(pairs) == 0:
170
139
return (None, [], None)
172
141
assert 0 < len(pairs) <= count
174
# We've narrowed down the message references. Fetch the messages.
175
143
ids = [v for k, v in pairs]
176
144
messages = self.messages.multiget(
177
145
ids, columns=['date', 'from', 'subject', 'message-id'])