12622.8.30
by Jonathan Lange
Clarify why we are doing things differently. |
1 |
# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
|
8687.15.22
by Karl Fogel
Add the copyright header block to the remaining .py files. |
2 |
# GNU Affero General Public License version 3 (see the file LICENSE).
|
6805.19.53
by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts. |
3 |
|
4 |
"""Facilities for running Jobs."""
|
|
5 |
||
8963.10.6
by Aaron Bentley
Refactor job running in terms of IRunnableJob. |
6 |
__metaclass__ = type |
7 |
||
10100.1.9
by Jonathan Lange
Export BaseRunnableJob. |
8 |
__all__ = [ |
9 |
'BaseRunnableJob', |
|
13242.1.12
by Aaron Bentley
Fix failing test and lint. |
10 |
'BaseRunnableJobSource', |
7675.624.24
by Tim Penhey
Record the counts of all possible jobs |
11 |
'JobCronScript', |
10100.1.9
by Jonathan Lange
Export BaseRunnableJob. |
12 |
'JobRunner', |
10100.1.21
by Jonathan Lange
Lots of trivial import fixes. |
13 |
'JobRunnerProcess', |
7675.624.52
by Tim Penhey
Lots of logging and a better producer. |
14 |
'TwistedJobRunner', |
10100.1.9
by Jonathan Lange
Export BaseRunnableJob. |
15 |
]
|
6805.19.53
by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts. |
16 |
|
17 |
||
10137.6.10
by Aaron Bentley
Switch from timeouts to deadlines. |
18 |
from calendar import timegm |
7675.624.28
by Tim Penhey
Fix collections. |
19 |
from collections import defaultdict |
9826.11.35
by Aaron Bentley
Add base implementation of contextManager. |
20 |
import contextlib |
10137.6.2
by Aaron Bentley
Fix test to be less timing-dependant. |
21 |
import logging |
9826.13.9
by Aaron Bentley
Update for review. |
22 |
import os |
13242.1.5
by Aaron Bentley
Add memory_limit support to jobs. |
23 |
from resource import ( |
24 |
getrlimit, |
|
25 |
RLIMIT_AS, |
|
26 |
setrlimit, |
|
27 |
)
|
|
11403.1.4
by Henning Eggers
Reformatted imports using format-imports script r32. |
28 |
from signal import ( |
29 |
SIGHUP, |
|
30 |
signal, |
|
31 |
)
|
|
6805.19.53
by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts. |
32 |
import sys |
7675.1120.25
by Jeroen Vermeulen
Use generic job runner, thanks to StevenK's suggestion; and document the generic job runner a bit. |
33 |
from textwrap import dedent |
6805.19.53
by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts. |
34 |
|
11403.1.4
by Henning Eggers
Reformatted imports using format-imports script r32. |
35 |
from ampoule import ( |
36 |
child, |
|
37 |
main, |
|
38 |
pool, |
|
39 |
)
|
|
40 |
from lazr.delegates import delegates |
|
10312.4.3
by Aaron Bentley
Reorganize imports |
41 |
import transaction |
14027.3.2
by Jeroen Vermeulen
Merge devel, resolve conflicts. |
42 |
from twisted.internet import reactor |
13785.5.6
by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2. |
43 |
from twisted.internet.defer import ( |
13785.5.7
by Aaron Bentley
Switch to inlineCallbacks approach. |
44 |
inlineCallbacks, |
13785.5.6
by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2. |
45 |
succeed, |
46 |
)
|
|
9826.11.15
by Aaron Bentley
Start trying to use the ampoule process pool. |
47 |
from twisted.protocols import amp |
13785.5.7
by Aaron Bentley
Switch to inlineCallbacks approach. |
48 |
from twisted.python import ( |
49 |
failure, |
|
50 |
log, |
|
51 |
)
|
|
9222.1.32
by Aaron Bentley
Split JobCronScript out into job/runner |
52 |
from zope.component import getUtility |
9826.11.15
by Aaron Bentley
Start trying to use the ampoule process pool. |
53 |
from zope.security.proxy import removeSecurityProxy |
9222.1.32
by Aaron Bentley
Split JobCronScript out into job/runner |
54 |
|
14022.3.1
by William Grant
Replace initZopeless mostly. |
55 |
from canonical.config import ( |
56 |
config, |
|
57 |
dbconfig, |
|
58 |
)
|
|
14565.2.15
by Curtis Hovey
Moved canonical.launchpad.scripts __init__ to lp.services.scripts. |
59 |
from lp.services import scripts |
10312.4.3
by Aaron Bentley
Reorganize imports |
60 |
from canonical.launchpad.webapp import errorlog |
11403.1.4
by Henning Eggers
Reformatted imports using format-imports script r32. |
61 |
from lp.services.job.interfaces.job import ( |
62 |
IJob, |
|
63 |
IRunnableJob, |
|
64 |
LeaseHeld, |
|
13125.2.8
by Julian Edwards
rename the special exception |
65 |
SuspendJobException, |
11403.1.4
by Henning Eggers
Reformatted imports using format-imports script r32. |
66 |
)
|
14022.2.4
by William Grant
Move set_immediate_mail_delivery into initZopeless' callsites, out of initZopeless itself. |
67 |
from lp.services.mail.sendmail import ( |
68 |
MailController, |
|
69 |
set_immediate_mail_delivery, |
|
70 |
)
|
|
10548.1.1
by Jonathan Lange
Move twistedsupport to lp.services |
71 |
from lp.services.scripts.base import LaunchpadCronScript |
13242.1.3
by Aaron Bentley
Extract run_reactor from TwistedJobRunner.runFromSource |
72 |
from lp.services.twistedsupport import run_reactor |
6805.19.53
by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts. |
73 |
|
74 |
||
13242.1.12
by Aaron Bentley
Fix failing test and lint. |
75 |
class BaseRunnableJobSource: |
76 |
"""Base class for job sources for the job runner."""
|
|
77 |
||
78 |
memory_limit = None |
|
79 |
||
80 |
@staticmethod
|
|
81 |
@contextlib.contextmanager |
|
82 |
def contextManager(): |
|
83 |
yield
|
|
84 |
||
85 |
||
86 |
class BaseRunnableJob(BaseRunnableJobSource): |
|
8963.10.6
by Aaron Bentley
Refactor job running in terms of IRunnableJob. |
87 |
"""Base class for jobs to be run via JobRunner.
|
88 |
||
89 |
Derived classes should implement IRunnableJob, which requires implementing
|
|
90 |
IRunnableJob.run. They should have a `job` member which implements IJob.
|
|
91 |
||
92 |
Subclasses may provide getOopsRecipients, to send mail about oopses.
|
|
93 |
If so, they should also provide getOperationDescription.
|
|
94 |
"""
|
|
8963.10.7
by Aaron Bentley
Switch from manual to automatic delegation. |
95 |
delegates(IJob, 'job') |
8963.10.6
by Aaron Bentley
Refactor job running in terms of IRunnableJob. |
96 |
|
9006.3.3
by Aaron Bentley
Cleanup |
97 |
user_error_types = () |
98 |
||
13084.3.3
by Aaron Bentley
Implement retries. |
99 |
retry_error_types = () |
100 |
||
10224.20.11
by Graham Binns
Review changes for Gavin. |
101 |
# We redefine __eq__ and __ne__ here to prevent the security proxy
|
102 |
# from mucking up our comparisons in tests and elsewhere.
|
|
103 |
def __eq__(self, job): |
|
14579.1.3
by Julian Edwards
review suggestion: fix comparisons on jobs by adding __lt__ and fixing __eq__ to look at the job's __dict__ (thanks allenap). |
104 |
naked_job = removeSecurityProxy(job) |
10224.20.11
by Graham Binns
Review changes for Gavin. |
105 |
return ( |
14579.1.3
by Julian Edwards
review suggestion: fix comparisons on jobs by adding __lt__ and fixing __eq__ to look at the job's __dict__ (thanks allenap). |
106 |
self.__class__ is naked_job.__class__ and |
107 |
self.__dict__ == naked_job.__dict__) |
|
10224.20.11
by Graham Binns
Review changes for Gavin. |
108 |
|
109 |
def __ne__(self, job): |
|
110 |
return not (self == job) |
|
111 |
||
14579.1.3
by Julian Edwards
review suggestion: fix comparisons on jobs by adding __lt__ and fixing __eq__ to look at the job's __dict__ (thanks allenap). |
112 |
def __lt__(self, job): |
113 |
naked_job = removeSecurityProxy(job) |
|
114 |
if self.__class__ is naked_job.__class__: |
|
115 |
return self.__dict__ < naked_job.__dict__ |
|
116 |
else: |
|
117 |
return NotImplemented |
|
118 |
||
8963.10.6
by Aaron Bentley
Refactor job running in terms of IRunnableJob. |
119 |
def getOopsRecipients(self): |
120 |
"""Return a list of email-ids to notify about oopses."""
|
|
9006.3.6
by Aaron Bentley
Fix call to getErrorRecipients. |
121 |
return self.getErrorRecipients() |
9006.3.2
by Aaron Bentley
Mail user errors |
122 |
|
7675.659.2
by Tim Penhey
Add extra error handling in case the sending of emails fails. |
123 |
def getOperationDescription(self): |
124 |
return 'unspecified operation' |
|
125 |
||
9006.3.2
by Aaron Bentley
Mail user errors |
126 |
def getErrorRecipients(self): |
127 |
"""Return a list of email-ids to notify about user errors."""
|
|
8963.10.6
by Aaron Bentley
Refactor job running in terms of IRunnableJob. |
128 |
return [] |
129 |
||
130 |
def getOopsMailController(self, oops_id): |
|
131 |
"""Return a MailController for notifying people about oopses.
|
|
132 |
||
133 |
Return None if there is no-one to notify.
|
|
134 |
"""
|
|
135 |
recipients = self.getOopsRecipients() |
|
136 |
if len(recipients) == 0: |
|
137 |
return None |
|
9006.3.3
by Aaron Bentley
Cleanup |
138 |
subject = 'Launchpad internal error' |
8963.10.6
by Aaron Bentley
Refactor job running in terms of IRunnableJob. |
139 |
body = ( |
140 |
'Launchpad encountered an internal error during the following'
|
|
141 |
' operation: %s. It was logged with id %s. Sorry for the' |
|
142 |
' inconvenience.' % (self.getOperationDescription(), oops_id)) |
|
8963.10.12
by Aaron Bentley
Use official noreply address for error mailings. |
143 |
from_addr = config.canonical.noreply_from_address |
9006.3.3
by Aaron Bentley
Cleanup |
144 |
return MailController(from_addr, recipients, subject, body) |
8963.10.6
by Aaron Bentley
Refactor job running in terms of IRunnableJob. |
145 |
|
9006.3.2
by Aaron Bentley
Mail user errors |
146 |
def getUserErrorMailController(self, e): |
9006.3.5
by Aaron Bentley
Cleanup. |
147 |
"""Return a MailController for notifying about user errors.
|
9006.3.2
by Aaron Bentley
Mail user errors |
148 |
|
149 |
Return None if there is no-one to notify.
|
|
150 |
"""
|
|
151 |
recipients = self.getErrorRecipients() |
|
152 |
if len(recipients) == 0: |
|
153 |
return None |
|
9006.3.3
by Aaron Bentley
Cleanup |
154 |
subject = 'Launchpad error while %s' % self.getOperationDescription() |
9006.3.2
by Aaron Bentley
Mail user errors |
155 |
body = ( |
156 |
'Launchpad encountered an error during the following'
|
|
157 |
' operation: %s. %s' % (self.getOperationDescription(), str(e))) |
|
158 |
from_addr = config.canonical.noreply_from_address |
|
159 |
return MailController(from_addr, recipients, subject, body) |
|
160 |
||
8963.10.6
by Aaron Bentley
Refactor job running in terms of IRunnableJob. |
161 |
def notifyOops(self, oops): |
162 |
"""Report this oops."""
|
|
13660.1.1
by Robert Collins
Use the write method from python-oops. |
163 |
ctrl = self.getOopsMailController(oops['id']) |
8963.10.6
by Aaron Bentley
Refactor job running in terms of IRunnableJob. |
164 |
if ctrl is None: |
165 |
return
|
|
166 |
ctrl.send() |
|
167 |
||
9314.1.1
by Aaron Bentley
Add oops var handling to the Job infrastructure |
168 |
def getOopsVars(self): |
9314.2.3
by Aaron Bentley
Update documentation |
169 |
"""See `IRunnableJob`."""
|
9314.1.1
by Aaron Bentley
Add oops var handling to the Job infrastructure |
170 |
return [('job_id', self.job.id)] |
171 |
||
9006.3.2
by Aaron Bentley
Mail user errors |
172 |
def notifyUserError(self, e): |
173 |
"""See `IRunnableJob`."""
|
|
174 |
ctrl = self.getUserErrorMailController(e) |
|
175 |
if ctrl is None: |
|
176 |
return
|
|
177 |
ctrl.send() |
|
178 |
||
8963.10.6
by Aaron Bentley
Refactor job running in terms of IRunnableJob. |
179 |
|
9826.11.9
by Aaron Bentley
Switch to PollingTaskConsumer and refactor. |
180 |
class BaseJobRunner(object): |
6805.19.53
by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts. |
181 |
"""Runner of Jobs."""
|
182 |
||
9826.13.5
by Aaron Bentley
Allow specifying error utility. |
183 |
def __init__(self, logger=None, error_utility=None): |
6805.19.53
by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts. |
184 |
self.completed_jobs = [] |
185 |
self.incomplete_jobs = [] |
|
7675.505.4
by Paul Hummer
Fixed tests |
186 |
if logger is None: |
187 |
logger = logging.getLogger() |
|
9314.1.4
by Aaron Bentley
Emit oopses in logs. |
188 |
self.logger = logger |
9826.13.5
by Aaron Bentley
Allow specifying error utility. |
189 |
self.error_utility = error_utility |
13242.1.7
by Aaron Bentley
Get oops-ids from job runner. |
190 |
self.oops_ids = [] |
9826.13.5
by Aaron Bentley
Allow specifying error utility. |
191 |
if self.error_utility is None: |
192 |
self.error_utility = errorlog.globalErrorUtility |
|
6805.19.53
by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts. |
193 |
|
13785.5.6
by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2. |
194 |
def acquireLease(self, job): |
195 |
self.logger.debug( |
|
196 |
'Trying to acquire lease for job in state %s' % ( |
|
197 |
job.status.title,)) |
|
198 |
try: |
|
199 |
job.acquireLease() |
|
200 |
except LeaseHeld: |
|
13785.5.9
by Aaron Bentley
Report lease-acquisition-failure as prominently as other failures. |
201 |
self.logger.info( |
13785.5.6
by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2. |
202 |
'Could not acquire lease for %s' % self.job_str(job)) |
203 |
self.incomplete_jobs.append(job) |
|
204 |
return False |
|
205 |
return True |
|
206 |
||
207 |
@staticmethod
|
|
208 |
def job_str(job): |
|
209 |
class_name = job.__class__.__name__ |
|
210 |
ijob_id = removeSecurityProxy(job).job.id |
|
211 |
return '%s (ID %d)' % (class_name, ijob_id) |
|
212 |
||
6805.19.53
by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts. |
213 |
def runJob(self, job): |
214 |
"""Attempt to run a job, updating its status as appropriate."""
|
|
8963.10.6
by Aaron Bentley
Refactor job running in terms of IRunnableJob. |
215 |
job = IRunnableJob(job) |
7675.505.1
by Aaron Bentley
Commit logging cowboy. |
216 |
|
13626.4.1
by Danilo Segan
Log Job ID when running jobs. |
217 |
self.logger.info( |
13785.5.6
by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2. |
218 |
'Running %s in status %s' % ( |
219 |
self.job_str(job), job.status.title)) |
|
7675.505.1
by Aaron Bentley
Commit logging cowboy. |
220 |
job.start() |
221 |
transaction.commit() |
|
13084.3.3
by Aaron Bentley
Implement retries. |
222 |
do_retry = False |
6805.19.53
by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts. |
223 |
try: |
13084.3.3
by Aaron Bentley
Implement retries. |
224 |
try: |
225 |
job.run() |
|
226 |
except job.retry_error_types, e: |
|
227 |
if job.attempt_count > job.max_retries: |
|
228 |
raise
|
|
229 |
self.logger.exception( |
|
230 |
"Scheduling retry due to %s.", e.__class__.__name__) |
|
231 |
do_retry = True |
|
13125.2.8
by Julian Edwards
rename the special exception |
232 |
except SuspendJobException: |
13125.2.6
by Julian Edwards
Add new SuspendJobError exception handler |
233 |
self.logger.debug("Job suspended itself") |
234 |
job.suspend() |
|
235 |
self.incomplete_jobs.append(job) |
|
7675.659.4
by Tim Penhey
logger.exception expects a message not an exception. |
236 |
except Exception: |
8969.2.1
by Aaron Bentley
Abort transaction when running a job that raises an exception. |
237 |
transaction.abort() |
8963.10.6
by Aaron Bentley
Refactor job running in terms of IRunnableJob. |
238 |
job.fail() |
7675.336.1
by Tim Penhey
Add a missing commit. |
239 |
# Record the failure.
|
240 |
transaction.commit() |
|
6805.19.53
by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts. |
241 |
self.incomplete_jobs.append(job) |
242 |
raise
|
|
243 |
else: |
|
6805.19.59
by Aaron Bentley
Fix failing test |
244 |
# Commit transaction to update the DB time.
|
245 |
transaction.commit() |
|
13084.3.3
by Aaron Bentley
Implement retries. |
246 |
if do_retry: |
247 |
job.queue() |
|
248 |
self.incomplete_jobs.append(job) |
|
249 |
else: |
|
250 |
job.complete() |
|
251 |
self.completed_jobs.append(job) |
|
6805.19.59
by Aaron Bentley
Fix failing test |
252 |
# Commit transaction to update job status.
|
253 |
transaction.commit() |
|
6805.19.53
by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts. |
254 |
|
9826.11.9
by Aaron Bentley
Switch to PollingTaskConsumer and refactor. |
255 |
def runJobHandleError(self, job): |
9826.12.9
by Aaron Bentley
Update docs. |
256 |
"""Run the specified job, handling errors.
|
257 |
||
258 |
Most errors will be logged as Oopses. Jobs in user_error_types won't.
|
|
259 |
The list of complete or incomplete jobs will be updated.
|
|
260 |
"""
|
|
9826.11.9
by Aaron Bentley
Switch to PollingTaskConsumer and refactor. |
261 |
job = IRunnableJob(job) |
9826.13.5
by Aaron Bentley
Allow specifying error utility. |
262 |
with self.error_utility.oopsMessage( |
9826.11.13
by Aaron Bentley
Merge stable into twistedjob. |
263 |
dict(job.getOopsVars())): |
264 |
try: |
|
7675.659.2
by Tim Penhey
Add extra error handling in case the sending of emails fails. |
265 |
try: |
266 |
self.logger.debug('Running %r', job) |
|
267 |
self.runJob(job) |
|
268 |
except job.user_error_types, e: |
|
14146.1.5
by Martin Pool
Message grammar fixes |
269 |
self.logger.info('Job %r failed with user error %r.' % |
14146.1.1
by Martin Pool
Log when jobs fail, even due to a user error |
270 |
(job, e)) |
7675.659.2
by Tim Penhey
Add extra error handling in case the sending of emails fails. |
271 |
job.notifyUserError(e) |
272 |
except Exception: |
|
273 |
info = sys.exc_info() |
|
274 |
return self._doOops(job, info) |
|
7675.659.4
by Tim Penhey
logger.exception expects a message not an exception. |
275 |
except Exception: |
7675.659.2
by Tim Penhey
Add extra error handling in case the sending of emails fails. |
276 |
# This only happens if sending attempting to notify users
|
277 |
# about errors fails for some reason (like a misconfigured
|
|
278 |
# email server).
|
|
7675.659.4
by Tim Penhey
logger.exception expects a message not an exception. |
279 |
self.logger.exception( |
280 |
"Failed to notify users about a failure.") |
|
9826.11.13
by Aaron Bentley
Merge stable into twistedjob. |
281 |
info = sys.exc_info() |
7675.659.2
by Tim Penhey
Add extra error handling in case the sending of emails fails. |
282 |
# Returning the oops says something went wrong.
|
7675.863.2
by Julian Edwards
include a fix from r11597 of devel that tries to stop test isolation issues pertaining to OOPSes |
283 |
return self.error_utility.raising(info) |
9826.11.29
by Aaron Bentley
Fix exception handling and SIGCHLD signal handling. |
284 |
|
9826.13.5
by Aaron Bentley
Allow specifying error utility. |
285 |
def _doOops(self, job, info): |
9826.13.9
by Aaron Bentley
Update for review. |
286 |
"""Report an OOPS for the provided job and info.
|
287 |
||
288 |
:param job: The IRunnableJob whose run failed.
|
|
289 |
:param info: The standard sys.exc_info() value.
|
|
290 |
:return: the Oops that was reported.
|
|
291 |
"""
|
|
7675.863.2
by Julian Edwards
include a fix from r11597 of devel that tries to stop test isolation issues pertaining to OOPSes |
292 |
oops = self.error_utility.raising(info) |
9826.11.29
by Aaron Bentley
Fix exception handling and SIGCHLD signal handling. |
293 |
job.notifyOops(oops) |
294 |
return oops |
|
9826.11.21
by Aaron Bentley
Ensure oopses are handled correctly. |
295 |
|
9826.11.22
by Aaron Bentley
Fix oops logging. |
296 |
def _logOopsId(self, oops_id): |
9826.13.9
by Aaron Bentley
Update for review. |
297 |
"""Report oopses by id to the log."""
|
9826.11.21
by Aaron Bentley
Ensure oopses are handled correctly. |
298 |
if self.logger is not None: |
13626.4.4
by Danilo Segan
j.c.sackett noticed an unneeded change, reverting. |
299 |
self.logger.info('Job resulted in OOPS: %s' % oops_id) |
13242.1.7
by Aaron Bentley
Get oops-ids from job runner. |
300 |
self.oops_ids.append(oops_id) |
9826.11.9
by Aaron Bentley
Switch to PollingTaskConsumer and refactor. |
301 |
|
302 |
||
303 |
class JobRunner(BaseJobRunner): |
|
304 |
||
305 |
def __init__(self, jobs, logger=None): |
|
306 |
BaseJobRunner.__init__(self, logger=logger) |
|
307 |
self.jobs = jobs |
|
308 |
||
309 |
@classmethod
|
|
310 |
def fromReady(cls, job_class, logger=None): |
|
311 |
"""Return a job runner for all ready jobs of a given class."""
|
|
312 |
return cls(job_class.iterReady(), logger) |
|
313 |
||
9826.11.10
by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript |
314 |
@classmethod
|
10372.1.1
by Aaron Bentley
Run with the designated database user. |
315 |
def runFromSource(cls, job_source, dbuser, logger): |
10372.1.3
by Aaron Bentley
Updates from review. |
316 |
"""Run all ready jobs provided by the specified source.
|
317 |
||
318 |
The dbuser parameter is ignored.
|
|
319 |
"""
|
|
9826.11.26
by Aaron Bentley
Provide job running context in a ContextManager. |
320 |
with removeSecurityProxy(job_source.contextManager()): |
321 |
logger.info("Running synchronously.") |
|
322 |
runner = cls.fromReady(job_source, logger) |
|
323 |
runner.runAll() |
|
9826.11.10
by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript |
324 |
return runner |
325 |
||
6805.19.53
by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts. |
326 |
def runAll(self): |
327 |
"""Run all the Jobs for this JobRunner."""
|
|
328 |
for job in self.jobs: |
|
9826.11.47
by Aaron Bentley
Fix names and merge breakage. |
329 |
job = IRunnableJob(job) |
13785.5.6
by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2. |
330 |
if not self.acquireLease(job): |
9826.11.36
by Aaron Bentley
Use lease to determine timeout. |
331 |
continue
|
332 |
# Commit transaction to clear the row lock.
|
|
333 |
transaction.commit() |
|
9826.11.21
by Aaron Bentley
Ensure oopses are handled correctly. |
334 |
oops = self.runJobHandleError(job) |
335 |
if oops is not None: |
|
13660.1.1
by Robert Collins
Use the write method from python-oops. |
336 |
self._logOopsId(oops['id']) |
9826.11.9
by Aaron Bentley
Switch to PollingTaskConsumer and refactor. |
337 |
|
338 |
||
9826.13.9
by Aaron Bentley
Update for review. |
339 |
class RunJobCommand(amp.Command): |
9826.11.18
by Aaron Bentley
UpdatePreviewDiff jobs successfully complete under ampoule. |
340 |
|
9826.11.15
by Aaron Bentley
Start trying to use the ampoule process pool. |
341 |
arguments = [('job_id', amp.Integer())] |
9826.11.21
by Aaron Bentley
Ensure oopses are handled correctly. |
342 |
response = [('success', amp.Integer()), ('oops_id', amp.String())] |
9826.11.15
by Aaron Bentley
Start trying to use the ampoule process pool. |
343 |
|
344 |
||
12269.4.10
by Aaron Bentley
Cleanup. |
345 |
def import_source(job_source_name): |
346 |
"""Return the IJobSource specified by its full name."""
|
|
347 |
module, name = job_source_name.rsplit('.', 1) |
|
348 |
source_module = __import__(module, fromlist=[name]) |
|
349 |
return getattr(source_module, name) |
|
350 |
||
351 |
||
9826.13.9
by Aaron Bentley
Update for review. |
352 |
class JobRunnerProcess(child.AMPChild): |
353 |
"""Base class for processes that run jobs."""
|
|
9826.11.20
by Aaron Bentley
Avoid repeating setup and teardown. |
354 |
|
10372.1.1
by Aaron Bentley
Run with the designated database user. |
355 |
def __init__(self, job_source_name, dbuser): |
9826.11.20
by Aaron Bentley
Avoid repeating setup and teardown. |
356 |
child.AMPChild.__init__(self) |
12269.4.10
by Aaron Bentley
Cleanup. |
357 |
self.job_source = import_source(job_source_name) |
10137.6.12
by Aaron Bentley
Use command arguments instead of subclassing JobRunnerProcess. |
358 |
self.context_manager = self.job_source.contextManager() |
10372.1.1
by Aaron Bentley
Run with the designated database user. |
359 |
# icky, but it's really a global value anyhow.
|
360 |
self.__class__.dbuser = dbuser |
|
9826.11.20
by Aaron Bentley
Avoid repeating setup and teardown. |
361 |
|
10372.1.1
by Aaron Bentley
Run with the designated database user. |
362 |
@classmethod
|
363 |
def __enter__(cls): |
|
10137.6.8
by Aaron Bentley
Use Context Manager instead of custom BOOTSTRAP. |
364 |
def handler(signum, frame): |
12622.8.31
by Jonathan Lange
Do it twice, to make sure and to get a better stack trace. |
365 |
# We raise an exception **and** schedule a call to exit the
|
366 |
# process hard. This is because we cannot rely on the exception
|
|
367 |
# being raised during useful code. Sometimes, it will be raised
|
|
368 |
# while the reactor is looping, which means that it will be
|
|
369 |
# ignored.
|
|
370 |
#
|
|
371 |
# If the exception is raised during the actual job, then we'll get
|
|
372 |
# a nice traceback indicating what timed out, and that will be
|
|
373 |
# logged as an OOPS.
|
|
374 |
#
|
|
375 |
# Regardless of where the exception is raised, we'll hard exit the
|
|
376 |
# process and have a TimeoutError OOPS logged, although that will
|
|
377 |
# have a crappy traceback. See the job_raised callback in
|
|
378 |
# TwistedJobRunner.runJobInSubprocess for the other half of that.
|
|
379 |
reactor.callFromThread( |
|
380 |
reactor.callLater, 0, os._exit, TwistedJobRunner.TIMEOUT_CODE) |
|
381 |
raise TimeoutError |
|
10137.6.8
by Aaron Bentley
Use Context Manager instead of custom BOOTSTRAP. |
382 |
scripts.execute_zcml_for_scripts(use_web_security=False) |
383 |
signal(SIGHUP, handler) |
|
14022.3.1
by William Grant
Replace initZopeless mostly. |
384 |
dbconfig.override(dbuser=cls.dbuser, isolation_level='read_committed') |
14022.2.4
by William Grant
Move set_immediate_mail_delivery into initZopeless' callsites, out of initZopeless itself. |
385 |
# XXX wgrant 2011-09-24 bug=29744: initZopeless used to do this.
|
386 |
# Should be removed from callsites verified to not need it.
|
|
387 |
set_immediate_mail_delivery(True) |
|
10137.6.8
by Aaron Bentley
Use Context Manager instead of custom BOOTSTRAP. |
388 |
|
389 |
@staticmethod
|
|
390 |
def __exit__(exc_type, exc_val, exc_tb): |
|
391 |
pass
|
|
392 |
||
9826.11.20
by Aaron Bentley
Avoid repeating setup and teardown. |
393 |
def makeConnection(self, transport): |
9826.13.9
by Aaron Bentley
Update for review. |
394 |
"""The Job context is entered on connect."""
|
9826.11.20
by Aaron Bentley
Avoid repeating setup and teardown. |
395 |
child.AMPChild.makeConnection(self, transport) |
9826.11.26
by Aaron Bentley
Provide job running context in a ContextManager. |
396 |
self.context_manager.__enter__() |
9826.11.20
by Aaron Bentley
Avoid repeating setup and teardown. |
397 |
|
398 |
def connectionLost(self, reason): |
|
9826.13.9
by Aaron Bentley
Update for review. |
399 |
"""The Job context is left on disconnect."""
|
9826.11.26
by Aaron Bentley
Provide job running context in a ContextManager. |
400 |
self.context_manager.__exit__(None, None, None) |
9826.11.20
by Aaron Bentley
Avoid repeating setup and teardown. |
401 |
child.AMPChild.connectionLost(self, reason) |
402 |
||
9826.13.9
by Aaron Bentley
Update for review. |
403 |
@RunJobCommand.responder |
404 |
def runJobCommand(self, job_id): |
|
10137.6.12
by Aaron Bentley
Use command arguments instead of subclassing JobRunnerProcess. |
405 |
"""Run a job from this job_source according to its job id."""
|
9826.11.15
by Aaron Bentley
Start trying to use the ampoule process pool. |
406 |
runner = BaseJobRunner() |
10137.6.12
by Aaron Bentley
Use command arguments instead of subclassing JobRunnerProcess. |
407 |
job = self.job_source.get(job_id) |
13242.1.5
by Aaron Bentley
Add memory_limit support to jobs. |
408 |
if self.job_source.memory_limit is not None: |
409 |
soft_limit, hard_limit = getrlimit(RLIMIT_AS) |
|
410 |
if soft_limit != self.job_source.memory_limit: |
|
411 |
limits = (self.job_source.memory_limit, hard_limit) |
|
412 |
setrlimit(RLIMIT_AS, limits) |
|
9826.11.21
by Aaron Bentley
Ensure oopses are handled correctly. |
413 |
oops = runner.runJobHandleError(job) |
414 |
if oops is None: |
|
415 |
oops_id = '' |
|
416 |
else: |
|
13660.1.1
by Robert Collins
Use the write method from python-oops. |
417 |
oops_id = oops['id'] |
9826.11.21
by Aaron Bentley
Ensure oopses are handled correctly. |
418 |
return {'success': len(runner.completed_jobs), 'oops_id': oops_id} |
9826.11.15
by Aaron Bentley
Start trying to use the ampoule process pool. |
419 |
|
420 |
||
9826.11.9
by Aaron Bentley
Switch to PollingTaskConsumer and refactor. |
421 |
class TwistedJobRunner(BaseJobRunner): |
9826.12.9
by Aaron Bentley
Update docs. |
422 |
"""Run Jobs via twisted."""
|
9826.11.9
by Aaron Bentley
Switch to PollingTaskConsumer and refactor. |
423 |
|
12622.8.25
by Jonathan Lange
Make both tests pass reliably |
424 |
TIMEOUT_CODE = 42 |
425 |
||
10372.1.1
by Aaron Bentley
Run with the designated database user. |
426 |
def __init__(self, job_source, dbuser, logger=None, error_utility=None): |
7675.845.15
by Edwin Grubbs
Better cron script. |
427 |
env = {'PATH': os.environ['PATH']} |
13785.5.11
by Aaron Bentley
Inject current sys.path into subprocesses. |
428 |
if 'LPCONFIG' in os.environ: |
429 |
env['LPCONFIG'] = os.environ['LPCONFIG'] |
|
430 |
env['PYTHONPATH'] = os.pathsep.join(sys.path) |
|
431 |
starter = main.ProcessStarter(env=env) |
|
9826.13.9
by Aaron Bentley
Update for review. |
432 |
super(TwistedJobRunner, self).__init__(logger, error_utility) |
9826.11.9
by Aaron Bentley
Switch to PollingTaskConsumer and refactor. |
433 |
self.job_source = job_source |
13626.4.1
by Danilo Segan
Log Job ID when running jobs. |
434 |
self.import_name = '%s.%s' % ( |
10137.6.13
by Aaron Bentley
Ensure update_preview_diffs doesn't require UpdatePreviewDiffJob.amp |
435 |
removeSecurityProxy(job_source).__module__, job_source.__name__) |
10137.6.9
by Aaron Bentley
Parameterize ProcessPool instead of using HUPProcessPool. |
436 |
self.pool = pool.ProcessPool( |
13626.4.1
by Danilo Segan
Log Job ID when running jobs. |
437 |
JobRunnerProcess, ampChildArgs=[self.import_name, str(dbuser)], |
10372.1.1
by Aaron Bentley
Run with the designated database user. |
438 |
starter=starter, min=0, timeout_signal=SIGHUP) |
9826.11.15
by Aaron Bentley
Start trying to use the ampoule process pool. |
439 |
|
440 |
def runJobInSubprocess(self, job): |
|
9826.13.9
by Aaron Bentley
Update for review. |
441 |
"""Run the job_class with the specified id in the process pool.
|
442 |
||
443 |
:return: a Deferred that fires when the job has completed.
|
|
444 |
"""
|
|
9826.11.47
by Aaron Bentley
Fix names and merge breakage. |
445 |
job = IRunnableJob(job) |
13785.5.6
by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2. |
446 |
if not self.acquireLease(job): |
447 |
return succeed(None) |
|
7675.624.54
by Tim Penhey
Commit the transaction once the lease is acquired. |
448 |
# Commit transaction to clear the row lock.
|
449 |
transaction.commit() |
|
9826.11.27
by Aaron Bentley
Stop removing security proxy to get job id. |
450 |
job_id = job.id |
10137.6.10
by Aaron Bentley
Switch from timeouts to deadlines. |
451 |
deadline = timegm(job.lease_expires.timetuple()) |
13626.4.2
by Danilo Segan
Log the ID for twistedjobrunner as well. |
452 |
|
453 |
# Log the job class and database ID for debugging purposes.
|
|
454 |
self.logger.info( |
|
13785.5.6
by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2. |
455 |
'Running %s.' % self.job_str(job)) |
7675.845.20
by Edwin Grubbs
Formatted imports and delinted. |
456 |
self.logger.debug( |
457 |
'Running %r, lease expires %s', job, job.lease_expires) |
|
9826.11.47
by Aaron Bentley
Fix names and merge breakage. |
458 |
deferred = self.pool.doWork( |
12622.8.28
by Jonathan Lange
Reduce duplication by making the StuckJob twins more data driven |
459 |
RunJobCommand, job_id=job_id, _deadline=deadline) |
12269.4.6
by Aaron Bentley
Support running TranslationMergeJobs from a script. |
460 |
|
9826.11.18
by Aaron Bentley
UpdatePreviewDiff jobs successfully complete under ampoule. |
461 |
def update(response): |
13785.5.8
by Aaron Bentley
Handle no-response jobs better. |
462 |
if response is None: |
463 |
self.incomplete_jobs.append(job) |
|
464 |
self.logger.debug('No response for %r', job) |
|
465 |
return
|
|
9826.11.18
by Aaron Bentley
UpdatePreviewDiff jobs successfully complete under ampoule. |
466 |
if response['success']: |
467 |
self.completed_jobs.append(job) |
|
7675.624.52
by Tim Penhey
Lots of logging and a better producer. |
468 |
self.logger.debug('Finished %r', job) |
9826.11.18
by Aaron Bentley
UpdatePreviewDiff jobs successfully complete under ampoule. |
469 |
else: |
470 |
self.incomplete_jobs.append(job) |
|
7675.624.52
by Tim Penhey
Lots of logging and a better producer. |
471 |
self.logger.debug('Incomplete %r', job) |
13242.1.4
by Aaron Bentley
Processes from failed jobs are not reused. |
472 |
# Kill the worker that experienced a failure; this only
|
473 |
# works because there's a single worker.
|
|
474 |
self.pool.stopAWorker() |
|
9826.11.21
by Aaron Bentley
Ensure oopses are handled correctly. |
475 |
if response['oops_id'] != '': |
9826.11.22
by Aaron Bentley
Fix oops logging. |
476 |
self._logOopsId(response['oops_id']) |
12269.4.6
by Aaron Bentley
Support running TranslationMergeJobs from a script. |
477 |
|
9826.11.29
by Aaron Bentley
Fix exception handling and SIGCHLD signal handling. |
478 |
def job_raised(failure): |
479 |
self.incomplete_jobs.append(job) |
|
12622.8.25
by Jonathan Lange
Make both tests pass reliably |
480 |
exit_code = getattr(failure.value, 'exitCode', None) |
481 |
if exit_code == self.TIMEOUT_CODE: |
|
12622.8.30
by Jonathan Lange
Clarify why we are doing things differently. |
482 |
# The process ended with the error code that we have
|
483 |
# arbitrarily chosen to indicate a timeout. Rather than log
|
|
484 |
# that error (ProcessDone), we log a TimeoutError instead.
|
|
12622.8.25
by Jonathan Lange
Make both tests pass reliably |
485 |
self._logTimeout(job) |
486 |
else: |
|
487 |
info = (failure.type, failure.value, failure.tb) |
|
488 |
oops = self._doOops(job, info) |
|
13660.1.1
by Robert Collins
Use the write method from python-oops. |
489 |
self._logOopsId(oops['id']) |
9826.11.29
by Aaron Bentley
Fix exception handling and SIGCHLD signal handling. |
490 |
deferred.addCallbacks(update, job_raised) |
9826.11.18
by Aaron Bentley
UpdatePreviewDiff jobs successfully complete under ampoule. |
491 |
return deferred |
9826.11.9
by Aaron Bentley
Switch to PollingTaskConsumer and refactor. |
492 |
|
12622.8.25
by Jonathan Lange
Make both tests pass reliably |
493 |
def _logTimeout(self, job): |
494 |
try: |
|
495 |
raise TimeoutError |
|
496 |
except TimeoutError: |
|
497 |
oops = self._doOops(job, sys.exc_info()) |
|
13660.1.1
by Robert Collins
Use the write method from python-oops. |
498 |
self._logOopsId(oops['id']) |
12622.8.25
by Jonathan Lange
Make both tests pass reliably |
499 |
|
13785.5.7
by Aaron Bentley
Switch to inlineCallbacks approach. |
500 |
@inlineCallbacks
|
9826.11.9
by Aaron Bentley
Switch to PollingTaskConsumer and refactor. |
501 |
def runAll(self): |
13785.5.6
by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2. |
502 |
"""Run all ready jobs."""
|
9826.13.9
by Aaron Bentley
Update for review. |
503 |
self.pool.start() |
13785.5.6
by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2. |
504 |
try: |
13785.5.7
by Aaron Bentley
Switch to inlineCallbacks approach. |
505 |
try: |
13785.5.12
by Aaron Bentley
Report when no jobs ran. |
506 |
job = None |
13785.5.7
by Aaron Bentley
Switch to inlineCallbacks approach. |
507 |
for job in self.job_source.iterReady(): |
508 |
yield self.runJobInSubprocess(job) |
|
13785.5.12
by Aaron Bentley
Report when no jobs ran. |
509 |
if job is None: |
510 |
self.logger.info('No jobs to run.') |
|
13785.5.6
by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2. |
511 |
self.terminated() |
13785.5.7
by Aaron Bentley
Switch to inlineCallbacks approach. |
512 |
except: |
513 |
self.failed(failure.Failure()) |
|
13785.5.6
by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2. |
514 |
except: |
515 |
self.terminated() |
|
516 |
raise
|
|
9826.11.15
by Aaron Bentley
Start trying to use the ampoule process pool. |
517 |
|
518 |
def terminated(self, ignored=None): |
|
9826.13.9
by Aaron Bentley
Update for review. |
519 |
"""Callback to stop the processpool and reactor."""
|
520 |
deferred = self.pool.stop() |
|
9826.11.15
by Aaron Bentley
Start trying to use the ampoule process pool. |
521 |
deferred.addBoth(lambda ignored: reactor.stop()) |
522 |
||
523 |
def failed(self, failure): |
|
9826.12.9
by Aaron Bentley
Update docs. |
524 |
"""Callback for when the job fails."""
|
9826.11.9
by Aaron Bentley
Switch to PollingTaskConsumer and refactor. |
525 |
failure.printTraceback() |
9826.11.15
by Aaron Bentley
Start trying to use the ampoule process pool. |
526 |
self.terminated() |
9222.1.32
by Aaron Bentley
Split JobCronScript out into job/runner |
527 |
|
9826.11.10
by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript |
528 |
@classmethod
|
13242.1.2
by Aaron Bentley
Support runnning TwistedJobRunner with logging. |
529 |
def runFromSource(cls, job_source, dbuser, logger, _log_twisted=False): |
10372.1.3
by Aaron Bentley
Updates from review. |
530 |
"""Run all ready jobs provided by the specified source.
|
531 |
||
532 |
The dbuser parameter is not ignored.
|
|
13242.1.2
by Aaron Bentley
Support runnning TwistedJobRunner with logging. |
533 |
:param _log_twisted: For debugging: If True, emit verbose Twisted
|
534 |
messages to stderr.
|
|
10372.1.3
by Aaron Bentley
Updates from review. |
535 |
"""
|
9826.11.10
by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript |
536 |
logger.info("Running through Twisted.") |
13242.1.2
by Aaron Bentley
Support runnning TwistedJobRunner with logging. |
537 |
if _log_twisted: |
538 |
logging.getLogger().setLevel(0) |
|
539 |
logger_object = logging.getLogger('twistedjobrunner') |
|
540 |
handler = logging.StreamHandler(sys.stderr) |
|
541 |
logger_object.addHandler(handler) |
|
542 |
observer = log.PythonLoggingObserver( |
|
543 |
loggerName='twistedjobrunner') |
|
544 |
log.startLoggingWithObserver(observer.emit) |
|
10372.1.3
by Aaron Bentley
Updates from review. |
545 |
runner = cls(job_source, dbuser, logger) |
9826.11.10
by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript |
546 |
reactor.callWhenRunning(runner.runAll) |
13242.1.3
by Aaron Bentley
Extract run_reactor from TwistedJobRunner.runFromSource |
547 |
run_reactor() |
9826.11.10
by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript |
548 |
return runner |
549 |
||
9222.1.32
by Aaron Bentley
Split JobCronScript out into job/runner |
550 |
|
551 |
class JobCronScript(LaunchpadCronScript): |
|
7675.1120.27
by Jeroen Vermeulen
Documented generic job runner a bit better. |
552 |
"""Generic job runner.
|
553 |
||
554 |
:ivar config_name: Optional name of a configuration section that specifies
|
|
555 |
the jobs to run. Alternatively, may be taken from the command line.
|
|
556 |
:ivar source_interface: `IJobSource`-derived utility to iterate pending
|
|
557 |
jobs of the type that is to be run.
|
|
558 |
"""
|
|
559 |
||
7675.845.22
by Edwin Grubbs
Suggestions from reviewer. |
560 |
config_name = None |
561 |
||
7675.1120.25
by Jeroen Vermeulen
Use generic job runner, thanks to StevenK's suggestion; and document the generic job runner a bit. |
562 |
usage = dedent("""\ |
563 |
run_jobs.py [options] [lazr-configuration-section]
|
|
564 |
||
565 |
Run Launchpad Jobs of one particular type.
|
|
566 |
||
567 |
The lazr configuration section specifies what jobs to run, and how.
|
|
568 |
It should provide at least:
|
|
569 |
||
570 |
* source_interface, the name of the IJobSource-derived utility
|
|
571 |
interface for the job type that you want to run.
|
|
572 |
||
573 |
* dbuser, the name of the database role to run the job under.
|
|
574 |
""").rstrip() |
|
575 |
||
576 |
description = ( |
|
577 |
"Takes pending jobs of the given type off the queue and runs them.") |
|
578 |
||
12269.4.6
by Aaron Bentley
Support running TranslationMergeJobs from a script. |
579 |
def __init__(self, runner_class=JobRunner, test_args=None, name=None, |
13701.1.2
by Aaron Bentley
Provide log-twisted option if job runner is TwistedJobRunner. |
580 |
commandline_config=False): |
7675.1120.27
by Jeroen Vermeulen
Documented generic job runner a bit better. |
581 |
"""Initialize a `JobCronScript`.
|
582 |
||
583 |
:param runner_class: The runner class to use. Defaults to
|
|
584 |
`JobRunner`, which runs synchronously, but could also be
|
|
585 |
`TwistedJobRunner` which is asynchronous.
|
|
586 |
:param test_args: For tests: pretend that this list of arguments has
|
|
587 |
been passed on the command line.
|
|
588 |
:param name: Identifying name for this type of job. Is also used to
|
|
589 |
compose a lock file name.
|
|
590 |
:param commandline_config: If True, take configuration from the
|
|
591 |
command line (in the form of a config section name). Otherwise,
|
|
592 |
rely on the subclass providing `config_name` and
|
|
593 |
`source_interface`.
|
|
594 |
"""
|
|
13701.1.2
by Aaron Bentley
Provide log-twisted option if job runner is TwistedJobRunner. |
595 |
self._runner_class = runner_class |
10427.22.2
by Gavin Panella
Override command-line args when running JobCronScriptSubclass in-process. |
596 |
super(JobCronScript, self).__init__( |
7675.845.27
by Edwin Grubbs
Merged in db-devel. |
597 |
name=name, dbuser=None, test_args=test_args) |
13701.1.5
by Aaron Bentley
Use JobCronScript.log_twisted. |
598 |
self.log_twisted = getattr(self.options, 'log_twisted', False) |
12269.4.6
by Aaron Bentley
Support running TranslationMergeJobs from a script. |
599 |
if not commandline_config: |
600 |
return
|
|
601 |
self.config_name = self.args[0] |
|
12269.4.10
by Aaron Bentley
Cleanup. |
602 |
self.source_interface = import_source( |
603 |
self.config_section.source_interface) |
|
7675.845.27
by Edwin Grubbs
Merged in db-devel. |
604 |
|
13701.1.2
by Aaron Bentley
Provide log-twisted option if job runner is TwistedJobRunner. |
605 |
def add_my_options(self): |
606 |
if self.runner_class is TwistedJobRunner: |
|
13701.1.6
by Aaron Bentley
Special-case process-job-source. |
607 |
self.add_log_twisted_option() |
608 |
||
609 |
def add_log_twisted_option(self): |
|
610 |
self.parser.add_option( |
|
611 |
'--log-twisted', action='store_true', default=False, |
|
612 |
help='Enable extra Twisted logging.') |
|
13701.1.2
by Aaron Bentley
Provide log-twisted option if job runner is TwistedJobRunner. |
613 |
|
7675.845.27
by Edwin Grubbs
Merged in db-devel. |
614 |
@property
|
7675.845.35
by Edwin Grubbs
Fixed tests. |
615 |
def dbuser(self): |
616 |
return self.config_section.dbuser |
|
617 |
||
618 |
@property
|
|
7675.845.27
by Edwin Grubbs
Merged in db-devel. |
619 |
def runner_class(self): |
620 |
"""Enable subclasses to override with command-line arguments."""
|
|
621 |
return self._runner_class |
|
9826.11.10
by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript |
622 |
|
7675.624.24
by Tim Penhey
Record the counts of all possible jobs |
623 |
def job_counts(self, jobs): |
624 |
"""Return a list of tuples containing the job name and counts."""
|
|
625 |
counts = defaultdict(lambda: 0) |
|
626 |
for job in jobs: |
|
627 |
counts[job.__class__.__name__] += 1 |
|
628 |
return sorted(counts.items()) |
|
629 |
||
7675.845.27
by Edwin Grubbs
Merged in db-devel. |
630 |
@property
|
631 |
def config_section(self): |
|
632 |
return getattr(config, self.config_name) |
|
7675.845.22
by Edwin Grubbs
Suggestions from reviewer. |
633 |
|
9826.11.10
by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript |
634 |
def main(self): |
7675.845.27
by Edwin Grubbs
Merged in db-devel. |
635 |
section = self.config_section |
7675.845.21
by Edwin Grubbs
Added db users. |
636 |
if (getattr(section, 'error_dir', None) is not None |
13686.2.1
by Robert Collins
Nuke the unused-in-prod copy-to-zlog aspect of oops reporting. |
637 |
and getattr(section, 'oops_prefix', None) is not None): |
638 |
# If the two variables are not set, we will let the error
|
|
7675.845.21
by Edwin Grubbs
Added db users. |
639 |
# utility default to using the [error_reports] config.
|
640 |
errorlog.globalErrorUtility.configure(self.config_name) |
|
9826.11.26
by Aaron Bentley
Provide job running context in a ContextManager. |
641 |
job_source = getUtility(self.source_interface) |
13701.1.1
by Aaron Bentley
Apply original log_twisted patch. |
642 |
kwargs = {} |
13701.1.5
by Aaron Bentley
Use JobCronScript.log_twisted. |
643 |
if self.log_twisted: |
13701.1.1
by Aaron Bentley
Apply original log_twisted patch. |
644 |
kwargs['_log_twisted'] = True |
10372.1.1
by Aaron Bentley
Run with the designated database user. |
645 |
runner = self.runner_class.runFromSource( |
13701.1.1
by Aaron Bentley
Apply original log_twisted patch. |
646 |
job_source, self.dbuser, self.logger, **kwargs) |
7675.624.24
by Tim Penhey
Record the counts of all possible jobs |
647 |
for name, count in self.job_counts(runner.completed_jobs): |
648 |
self.logger.info('Ran %d %s jobs.', count, name) |
|
649 |
for name, count in self.job_counts(runner.incomplete_jobs): |
|
650 |
self.logger.info('%d %s jobs did not complete.', count, name) |
|
9826.11.42
by Aaron Bentley
Terminate jobs with SIGHUP |
651 |
|
652 |
||
653 |
class TimeoutError(Exception): |
|
654 |
||
655 |
def __init__(self): |
|
656 |
Exception.__init__(self, "Job ran too long.") |