842
class PopulateSPRChangelogs(TunableLoop):
843
maximum_chunk_size = 1
845
def __init__(self, log, abort_time=None):
846
super(PopulateSPRChangelogs, self).__init__(log, abort_time)
847
value = getUtility(IMemcacheClient).get('populate-spr-changelogs')
851
self.start_at = value
852
self.finish_at = self.getCandidateSPRs(0).last()
854
def getCandidateSPRs(self, start_at):
855
return IMasterStore(SourcePackageRelease).using(
856
SourcePackageRelease,
857
# Find any SPRFs that have expired (LFA.content IS NULL).
859
SourcePackageReleaseFile,
860
SourcePackageReleaseFile.sourcepackagereleaseID ==
861
SourcePackageRelease.id),
864
And(LibraryFileAlias.id ==
865
SourcePackageReleaseFile.libraryfileID,
866
LibraryFileAlias.content == None)),
867
# And exclude any SPRs that have any expired SPRFs.
869
SourcePackageRelease.id,
870
SourcePackageRelease.id >= start_at,
871
SourcePackageRelease.changelog == None,
872
).group_by(SourcePackageRelease.id).having(
873
Count(LibraryFileAlias) == 0
874
).order_by(SourcePackageRelease.id)
877
return self.start_at > self.finish_at
879
def __call__(self, chunk_size):
880
for sprid in self.getCandidateSPRs(self.start_at)[:chunk_size]:
881
spr = SourcePackageRelease.get(sprid)
882
with TempDir() as tmp_dir:
885
# Grab the files from the librarian into a temporary
888
for sprf in spr.files:
890
tmp_dir.path, sprf.libraryfile.filename)
891
dest_file = open(dest, 'w')
892
sprf.libraryfile.open()
893
copy_and_close(sprf.libraryfile, dest_file)
894
if dest.endswith('.dsc'):
898
'SPR %d (%s %s) has missing library files.' % (
899
spr.id, spr.name, spr.version))
904
'SPR %d (%s %s) has no DSC.' % (
905
spr.id, spr.name, spr.version))
908
# Extract the source package. Throw away stdout/stderr
909
# -- we only really care about the return code.
910
fnull = open('/dev/null', 'w')
911
ret = subprocess.call(
912
['dpkg-source', '-x', dsc_file, os.path.join(
913
tmp_dir.path, 'extracted')],
914
stdout=fnull, stderr=fnull,
915
preexec_fn=subprocess_setup)
919
'SPR %d (%s %s) failed to unpack: returned %d' % (
920
spr.id, spr.name, spr.version, ret))
923
# We have an extracted source package. Let's get the
924
# changelog. findFile ensures that it's not too huge, and
927
changelog_path = findFile(
928
tmp_dir.path, 'debian/changelog')
929
except UploadError, e:
930
changelog_path = None
932
'SPR %d (%s %s) changelog could not be '
934
spr.id, spr.name, spr.version, e))
936
# The LFA should be restricted only if there aren't any
937
# public publications.
938
restricted = not any(
939
not a.private for a in spr.published_archives)
940
spr.changelog = getUtility(ILibraryFileAliasSet).create(
942
os.stat(changelog_path).st_size,
943
open(changelog_path, "r"),
944
"text/x-debian-source-changelog",
945
restricted=restricted)
946
self.log.info('SPR %d (%s %s) changelog imported.' % (
947
spr.id, spr.name, spr.version))
949
self.log.warning('SPR %d (%s %s) had no changelog.' % (
950
spr.id, spr.name, spr.version))
952
self.start_at = spr.id + 1
953
result = getUtility(IMemcacheClient).set(
954
'populate-spr-changelogs', self.start_at)
956
self.log.warning('Failed to set start_at in memcache.')
960
827
class BaseDatabaseGarbageCollector(LaunchpadCronScript):
961
828
"""Abstract base class to run a collection of TunableLoops."""
962
829
script_name = None # Script name for locking and database user. Override.
999
866
% multiprocessing.cpu_count())
1002
start_time = time.time()
869
self.start_time = time.time()
1004
871
# Stores the number of failed tasks.
1005
872
self.failure_count = 0
874
# Copy the list so we can safely consume it.
875
tunable_loops = list(self.tunable_loops)
1007
876
if self.options.experimental:
1008
tunable_loops = list(
1009
self.tunable_loops + self.experimental_tunable_loops)
1011
tunable_loops = list(self.tunable_loops)
1013
a_very_long_time = float(31536000) # 1 year
1014
abort_script = self.options.abort_script or a_very_long_time
1018
"Worker thread %s running.", threading.currentThread().name)
1021
if start_time + abort_script - time.time() <= 0:
1022
# Exit silently. We warn later.
1024
"Worker thread %s detected script timeout.",
1025
threading.currentThread().name)
1028
num_remaining_tasks = len(tunable_loops)
1029
if not num_remaining_tasks:
1031
tunable_loop_class = tunable_loops.pop(0)
1033
loop_name = tunable_loop_class.__name__
1035
# Configure logging for this loop to use a prefix. Log
1036
# output from multiple threads will be interleaved, and
1037
# this lets us tell log output from different tasks
1039
loop_logger = logging.getLogger('garbo.' + loop_name)
1040
loop_logger.addFilter(PrefixFilter(loop_name))
1042
loop_logger.info("Running %s", loop_name)
1044
# How long until the script should abort.
1045
remaining_script_time = (
1046
abort_script + start_time - time.time())
1048
# How long until the task should abort.
1049
if self.options.abort_task is not None:
1050
# Task timeout specified on command line.
1051
abort_task = self.options.abort_task
1053
elif num_remaining_tasks <= self.options.threads:
1054
# We have a thread for every remaining task. Let the
1055
# task run until the script timeout.
1056
self.logger.debug2("Task may run until script timeout.")
1057
abort_task = remaining_script_time
1060
# Evenly distribute the remaining time to the
1063
self.options.threads
1064
* remaining_script_time / num_remaining_tasks)
1066
abort_time = min(abort_task, remaining_script_time)
1068
"Task will be terminated in %0.3f seconds", abort_time)
1070
tunable_loop = tunable_loop_class(
1071
abort_time=abort_time, log=loop_logger)
1073
if self._maximum_chunk_size is not None:
1074
tunable_loop.maximum_chunk_size = self._maximum_chunk_size
1078
loop_logger.debug("%s completed sucessfully.", loop_name)
1080
loop_logger.exception("Unhandled exception")
1081
self.failure_count += 1
877
tunable_loops.extend(self.experimental_tunable_loops)
1086
880
for count in range(0, self.options.threads):
1087
881
thread = threading.Thread(
1088
target=worker, name='Worker-%d' % (count+1,))
882
target=self.run_tasks_in_thread,
883
name='Worker-%d' % (count+1,),
884
args=(tunable_loops,))
1090
886
threads.add(thread)
1095
891
# down when the script timeout is hit, and the extra time is to
1096
892
# give them a chance to clean up.
1097
893
for thread in threads:
1098
time_to_go = start_time + abort_script - time.time() + 60
894
time_to_go = self.get_remaining_script_time() + 60
1099
895
if time_to_go > 0:
1100
896
thread.join(time_to_go)
1104
900
# If the script ran out of time, warn.
1105
if start_time + abort_script - time.time() < 0:
901
if self.get_remaining_script_time() < 0:
1106
902
self.logger.warn(
1107
"Script aborted after %d seconds.", abort_script)
903
"Script aborted after %d seconds.", self.script_timeout)
906
self.logger.warn("%d tasks did not run.", len(tunable_loops))
1109
908
if self.failure_count:
909
self.logger.error("%d tasks failed.", self.failure_count)
1110
910
raise SilentLaunchpadScriptFailure(self.failure_count)
912
def get_remaining_script_time(self):
913
return self.start_time + self.script_timeout - time.time()
916
def script_timeout(self):
917
a_very_long_time = 31536000 # 1 year
918
return self.options.abort_script or a_very_long_time
920
def get_loop_logger(self, loop_name):
921
"""Retrieve a logger for use by a particular task.
923
The logger will be configured to add the loop_name as a
924
prefix to all log messages, making interleaved output from
925
multiple threads somewhat readable.
927
loop_logger = logging.getLogger('garbo.' + loop_name)
928
for filter in loop_logger.filters:
929
if isinstance(filter, PrefixFilter):
930
return loop_logger # Already have a PrefixFilter attached.
931
loop_logger.addFilter(PrefixFilter(loop_name))
934
def get_loop_abort_time(self, num_remaining_tasks):
935
# How long until the task should abort.
936
if self.options.abort_task is not None:
937
# Task timeout specified on command line.
938
abort_task = self.options.abort_task
940
elif num_remaining_tasks <= self.options.threads:
941
# We have a thread for every remaining task. Let
942
# the task run until the script timeout.
944
"Task may run until script timeout.")
945
abort_task = self.get_remaining_script_time()
948
# Evenly distribute the remaining time to the
952
* self.get_remaining_script_time() / num_remaining_tasks)
954
return min(abort_task, self.get_remaining_script_time())
956
def run_tasks_in_thread(self, tunable_loops):
957
"""Worker thread target to run tasks.
959
Tasks are removed from tunable_loops and run one at a time,
960
until all tasks that can be run have been run or the script
964
"Worker thread %s running.", threading.currentThread().name)
968
# How long until the script should abort.
969
if self.get_remaining_script_time() <= 0:
970
# Exit silently. We warn later.
972
"Worker thread %s detected script timeout.",
973
threading.currentThread().name)
976
num_remaining_tasks = len(tunable_loops)
977
if not num_remaining_tasks:
979
tunable_loop_class = tunable_loops.pop(0)
981
loop_name = tunable_loop_class.__name__
983
loop_logger = self.get_loop_logger(loop_name)
985
# Aquire a lock for the task. Multiple garbo processes
986
# might be running simultaneously.
987
loop_lock_path = os.path.join(
988
LOCK_PATH, 'launchpad-garbo-%s.lock' % loop_name)
989
# No logger - too noisy, so report issues ourself.
990
loop_lock = GlobalLock(loop_lock_path, logger=None)
993
loop_logger.debug("Aquired lock %s.", loop_lock_path)
994
except LockAlreadyAcquired:
995
# If the lock cannot be acquired, but we have plenty
996
# of time remaining, just put the task back to the
998
if self.get_remaining_script_time() > 60:
1000
"Unable to acquire lock %s. Running elsewhere?",
1002
time.sleep(0.3) # Avoid spinning.
1003
tunable_loops.append(tunable_loop_class)
1004
# Otherwise, emit a warning and skip the task.
1007
"Unable to acquire lock %s. Running elsewhere?",
1012
loop_logger.info("Running %s", loop_name)
1014
abort_time = self.get_loop_abort_time(num_remaining_tasks)
1016
"Task will be terminated in %0.3f seconds",
1019
tunable_loop = tunable_loop_class(
1020
abort_time=abort_time, log=loop_logger)
1022
# Allow the test suite to override the chunk size.
1023
if self._maximum_chunk_size is not None:
1024
tunable_loop.maximum_chunk_size = (
1025
self._maximum_chunk_size)
1030
"%s completed sucessfully.", loop_name)
1032
loop_logger.exception("Unhandled exception")
1033
self.failure_count += 1
1037
loop_logger.debug("Released lock %s.", loop_lock_path)
1113
1041
class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
1114
1042
script_name = 'garbo-hourly'