1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
|
# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
__metaclass__ = type
__all__ = [
'DBLoopTuner',
'ITunableLoop',
'LoopTuner',
'TunableLoop',
]
from datetime import timedelta
import sys
import time
import transaction
from zope.component import getUtility
from zope.interface import (
implements,
Interface,
)
import lp.services.scripts
from lp.services.webapp.interfaces import (
IStoreSelector,
MAIN_STORE,
MASTER_FLAVOR,
)
class ITunableLoop(Interface):
"""Interface for self-tuning loop bodies to be driven by LoopTuner.
To construct a self-tuning batched loop, define your loop body as a class
implementing TunableLoop, and pass an instance to your LoopTuner.
"""
def isDone():
"""Is this loop finished?
Once this returns True, the LoopTuner will no longer touch this
object.
"""
def __call__(chunk_size):
"""Perform an iteration of the loop.
The chunk_size parameter says (in some way you define) how much work
the LoopTuner believes you should try to do in this iteration in order
to get as close as possible to your time goal.
Note that chunk_size is a float, so, for example, if you use it to
slice a list, be careful to round it to an int first.
"""
def cleanUp(self):
"""Clean up any open resources.
Optional.
This method is needed because loops may be aborted before
completion, so clean up code in the isDone() method may
never be invoked.
"""
class LoopTuner:
"""A loop that tunes itself to approximate an ideal time per iteration.
Use this for large processing jobs that need to be broken down into chunks
of such size that processing a single chunk takes approximately a given
ideal time. For example, large database operations may have to be
performed and committed in a large number of small steps in order to avoid
locking out other clients that need to access the same data. Regular
commits allow other clients to get their work done.
In such a situation, committing for every step is often far too costly.
Imagine inserting a million rows and committing after every row! You
could hand-pick a static number of steps per commit, but that takes a lot
of experimental guesswork and it will still waste time when things go
well, and on the other hand, it will still end up taking too much time per
batch when the system slows down for whatever reason.
Instead, define your loop body in an ITunableLoop; parameterize it on the
number of steps per batch; say how much time you'd like to spend per
batch; and pass it to a LoopTuner. The LoopTuner will execute your loop,
dynamically tuning its batch-size parameter to stay close to your time
goal. If things go faster than expected, it will ask your loop body to do
more work for the next batch. If a batch takes too much time, the next
batch will be smaller. There is also some cushioning for one-off spikes
and troughs in processing speed.
"""
def __init__(
self, operation, goal_seconds,
minimum_chunk_size=1, maximum_chunk_size=1000000000,
abort_time=None, cooldown_time=None, log=None):
"""Initialize a loop, to be run to completion at most once.
Parameters:
operation: an object implementing the loop body. It must support the
ITunableLoop interface.
goal_seconds: the ideal number of seconds for any one iteration to
take. The algorithm will vary chunk size in order to stick close
to this ideal.
minimum_chunk_size: the smallest chunk size that is reasonable. The
tuning algorithm will never let chunk size sink below this value.
maximum_chunk_size: the largest allowable chunk size. A maximum is
needed even if the ITunableLoop ignores chunk size for whatever
reason, since reaching floating-point infinity would seriously
break the algorithm's arithmetic.
cooldown_time: time (in seconds, float) to sleep between consecutive
operation runs. Defaults to None for no sleep.
abort_time: abort the loop, logging a WARNING message, if the runtime
takes longer than this many seconds.
log: The log object to use. DEBUG level messages are logged
giving iteration statistics.
"""
assert(ITunableLoop.providedBy(operation))
self.operation = operation
self.goal_seconds = float(goal_seconds)
self.minimum_chunk_size = minimum_chunk_size
self.maximum_chunk_size = maximum_chunk_size
self.cooldown_time = cooldown_time
self.abort_time = abort_time
if log is None:
self.log = lp.services.scripts.log
else:
self.log = log
# True if this task has timed out. Set by _isTimedOut().
_has_timed_out = False
def _isTimedOut(self, extra_seconds=0):
"""Return True if the task will be timed out in extra_seconds.
If this method returns True, all future calls will also return
True.
"""
if self.abort_time is None:
return False
if self._has_timed_out:
return True
if self._time() + extra_seconds >= self.start_time + self.abort_time:
self._has_timed_out = True
return True
return False
def run(self):
"""Run the loop to completion."""
# Cleanup function, if we have one.
cleanup = getattr(self.operation, 'cleanUp', lambda: None)
try:
chunk_size = self.minimum_chunk_size
iteration = 0
total_size = 0
self.start_time = self._time()
last_clock = self.start_time
while not self.operation.isDone():
if self._isTimedOut():
self.log.warn(
"Task aborted after %d seconds.", self.abort_time)
break
self.operation(chunk_size)
new_clock = self._time()
time_taken = new_clock - last_clock
last_clock = new_clock
self.log.debug2(
"Iteration %d (size %.1f): %.3f seconds",
iteration, chunk_size, time_taken)
last_clock = self._coolDown(last_clock)
total_size += chunk_size
# Adjust parameter value to approximate goal_seconds.
# The new value is the average of two numbers: the
# previous value, and an estimate of how many rows would
# take us to exactly goal_seconds seconds. The weight in
# this estimate of any given historic measurement decays
# exponentially with an exponent of 1/2. This softens
# the blows from spikes and dips in processing time. Set
# a reasonable minimum for time_taken, just in case we
# get weird values for whatever reason and destabilize
# the algorithm.
time_taken = max(self.goal_seconds / 10, time_taken)
chunk_size *= (1 + self.goal_seconds / time_taken) / 2
chunk_size = max(chunk_size, self.minimum_chunk_size)
chunk_size = min(chunk_size, self.maximum_chunk_size)
iteration += 1
total_time = last_clock - self.start_time
average_size = total_size / max(1, iteration)
average_speed = total_size / max(1, total_time)
self.log.debug2(
"Done. %d items in %d iterations, %3f seconds, "
"average size %f (%s/s)",
total_size, iteration, total_time, average_size,
average_speed)
except Exception:
exc_info = sys.exc_info()
try:
cleanup()
except Exception:
# We need to raise the original exception, but we don't
# want to lose the information about the cleanup
# failure, so log it.
self.log.exception("Unhandled exception in cleanUp")
# Reraise the original exception.
raise exc_info[0], exc_info[1], exc_info[2]
else:
cleanup()
def _coolDown(self, bedtime):
"""Sleep for `self.cooldown_time` seconds, if set.
Assumes that anything the main LoopTuner loop does apart from
doing a chunk of work or sleeping takes zero time.
:param bedtime: Time the cooldown started, i.e. the time the
chunk of real work was completed.
:return: Time when cooldown completed, i.e. the starting time
for a next chunk of work.
"""
if self.cooldown_time is None or self.cooldown_time <= 0.0:
return bedtime
else:
self._sleep(self.cooldown_time)
return self._time()
def _time(self):
"""Monotonic system timer with unit of 1 second.
Overridable so tests can fake processing speeds accurately and without
actually waiting.
"""
return time.time()
def _sleep(self, seconds):
"""Sleep.
If the sleep interval would put us over the tasks timeout,
do nothing.
"""
if not self._isTimedOut(seconds):
time.sleep(seconds)
def timedelta_to_seconds(td):
return 24 * 60 * td.days + td.seconds
class DBLoopTuner(LoopTuner):
"""A LoopTuner that plays well with PostgreSQL and replication.
This LoopTuner blocks when database replication is lagging.
Making updates faster than replication can deal with them is
counter productive and in extreme cases can put the database
into a death spiral. So we don't do that.
This LoopTuner also blocks when there are long running
transactions. Vacuuming is ineffective when there are long
running transactions. We block when long running transactions
have been detected, as it means we have already been bloating
the database for some time and we shouldn't make it worse. Once
the long running transactions have completed, we know the dead
space we have already caused can be cleaned up so we can keep
going.
INFO level messages are logged when the DBLoopTuner blocks in addition
to the DEBUG level messages emitted by the standard LoopTuner.
"""
# We block until replication lag is under this threshold.
acceptable_replication_lag = timedelta(seconds=30) # In seconds.
# We block if there are transactions running longer than this threshold.
long_running_transaction = 30 * 60 # In seconds.
def _blockWhenLagged(self):
"""When database replication lag is high, block until it drops."""
# Lag is most meaningful on the master.
store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR)
msg_counter = 0
while not self._isTimedOut():
lag = store.execute("SELECT replication_lag()").get_one()[0]
if lag is None or lag <= self.acceptable_replication_lag:
return
# Report just once every 10 minutes to avoid log spam.
msg_counter += 1
if msg_counter % 60 == 1:
self.log.info(
"Database replication lagged %s. "
"Sleeping up to 10 minutes.", lag)
# Don't become a long running transaction!
transaction.abort()
self._sleep(10)
def _blockForLongRunningTransactions(self):
"""If there are long running transactions, block to avoid making
bloat worse."""
if self.long_running_transaction is None:
return
store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR)
msg_counter = 0
while not self._isTimedOut():
results = list(store.execute("""
SELECT
CURRENT_TIMESTAMP - xact_start,
procpid,
usename,
datname,
current_query
FROM activity()
WHERE xact_start < CURRENT_TIMESTAMP - interval '%f seconds'
AND datname = current_database()
ORDER BY xact_start LIMIT 4
""" % self.long_running_transaction).get_all())
if not results:
break
# Check for long running transactions every 10 seconds, but
# only report every 10 minutes to avoid log spam.
msg_counter += 1
if msg_counter % 60 == 1:
for runtime, procpid, usename, datname, query in results:
self.log.info(
"Blocked on %s old xact %s@%s/%d - %s.",
runtime, usename, datname, procpid, query)
self.log.info("Sleeping for up to 10 minutes.")
# Don't become a long running transaction!
transaction.abort()
self._sleep(10)
def _coolDown(self, bedtime):
"""As per LoopTuner._coolDown, except we always wait until there
is no replication lag.
"""
self._blockForLongRunningTransactions()
self._blockWhenLagged()
if self.cooldown_time is not None and self.cooldown_time > 0.0:
remaining_nap = self._time() - bedtime + self.cooldown_time
if remaining_nap > 0.0:
self._sleep(remaining_nap)
return self._time()
class TunableLoop:
"""A base implementation of `ITunableLoop`."""
implements(ITunableLoop)
goal_seconds = 2
minimum_chunk_size = 1
maximum_chunk_size = None # Override.
cooldown_time = 0
def __init__(self, log, abort_time=None):
self.log = log
self.abort_time = abort_time
def isDone(self):
"""Return True when the TunableLoop is complete."""
raise NotImplementedError(self.isDone)
def run(self):
assert self.maximum_chunk_size is not None, (
"Did not override maximum_chunk_size.")
DBLoopTuner(
self, self.goal_seconds,
minimum_chunk_size=self.minimum_chunk_size,
maximum_chunk_size=self.maximum_chunk_size,
cooldown_time=self.cooldown_time,
abort_time=self.abort_time,
log=self.log).run()
|