1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# Copyright 2009 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""The code import dispatcher.
The code import dispatcher is responsible for checking if any code
imports need to be processed and launching child processes to handle
them.
"""
__metaclass__ = type
__all__ = [
'CodeImportDispatcher',
]
import os
import socket
import subprocess
import time
from canonical.config import config
class CodeImportDispatcher:
"""A CodeImportDispatcher kicks off the processing of a job if needed.
The entry point is `findAndDispatchJob`.
:ivar txn: A transaction manager.
:ivar logger: A `Logger` object.
"""
worker_script = os.path.join(
config.root, 'scripts', 'code-import-worker-monitor.py')
def __init__(self, logger, worker_limit, _sleep=time.sleep):
"""Initialize an instance.
:param logger: A `Logger` object.
"""
self.logger = logger
self.worker_limit = worker_limit
self._sleep = _sleep
def getHostname(self):
"""Return the hostname of this machine.
This usually calls `socket.gethostname` but it can be
overridden by the config for tests and developer machines.
"""
if config.codeimportdispatcher.forced_hostname:
return config.codeimportdispatcher.forced_hostname
else:
return socket.gethostname()
def dispatchJob(self, job_id):
"""Start the processing of job `job_id`."""
# Just launch the process and forget about it.
log_file = os.path.join(
config.codeimportdispatcher.worker_log_dir,
'code-import-worker-%d.log' % (job_id,))
# Return the Popen object to make testing easier.
interpreter = "%s/bin/py" % config.root
return subprocess.Popen(
[interpreter, self.worker_script, str(job_id), '-vv',
'--log-file', log_file])
def findAndDispatchJob(self, scheduler_client):
"""Check for and dispatch a job if necessary.
:return: A boolean, true if a job was found and dispatched.
"""
job_id = scheduler_client.getJobForMachine(
self.getHostname(), self.worker_limit)
if job_id == 0:
self.logger.info("No jobs pending.")
return False
self.logger.info("Dispatching job %d." % job_id)
self.dispatchJob(job_id)
return True
def _getSleepInterval(self):
"""How long to sleep for until asking for a new job.
The basic idea is to wait longer if the machine is more heavily
loaded, so that less loaded slaves get a chance to grab some jobs.
We assume worker_limit will be roughly the number of CPUs in the
machine, so load/worker_limit is roughly how loaded the machine is.
"""
return 5*os.getloadavg()[0]/self.worker_limit
def findAndDispatchJobs(self, scheduler_client):
"""Call findAndDispatchJob until no job is found."""
while True:
found = self.findAndDispatchJob(scheduler_client)
if not found:
break
self._sleep(self._getSleepInterval())
|