~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/scripts/garbo.py

  • Committer: Ian Booth
  • Date: 2011-04-19 15:10:57 UTC
  • mfrom: (12868 devel)
  • mto: This revision was merged to the branch mainline in revision 12983.
  • Revision ID: ian.booth@canonical.com-20110419151057-he56y6k29c4zeiyk
MergeĀ fromĀ trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
    datetime,
14
14
    timedelta,
15
15
    )
16
 
from fixtures import TempDir
17
16
import logging
18
 
import multiprocessing
19
17
import os
20
 
import signal
21
 
import subprocess
22
18
import threading
23
19
import time
24
20
 
 
21
from contrib.glock import (
 
22
    GlobalLock,
 
23
    LockAlreadyAcquired,
 
24
    )
 
25
import multiprocessing
25
26
from psycopg2 import IntegrityError
26
27
import pytz
27
 
from storm.expr import LeftJoin
28
28
from storm.locals import (
29
 
    And,
30
 
    Count,
31
29
    Max,
32
30
    Min,
33
31
    SQL,
44
42
    sqlvalues,
45
43
    )
46
44
from canonical.launchpad.database.emailaddress import EmailAddress
47
 
from canonical.launchpad.database.librarian import (
48
 
    LibraryFileAlias,
49
 
    TimeLimitedToken,
50
 
    )
 
45
from canonical.launchpad.database.librarian import TimeLimitedToken
51
46
from canonical.launchpad.database.oauth import OAuthNonce
52
47
from canonical.launchpad.database.openidconsumer import OpenIDConsumerNonce
53
48
from canonical.launchpad.interfaces.emailaddress import EmailAddressStatus
54
 
from canonical.launchpad.interfaces.librarian import ILibraryFileAliasSet
55
49
from canonical.launchpad.interfaces.lpstorm import IMasterStore
56
50
from canonical.launchpad.utilities.looptuner import TunableLoop
57
51
from canonical.launchpad.webapp.interfaces import (
59
53
    MAIN_STORE,
60
54
    MASTER_FLAVOR,
61
55
    )
62
 
from canonical.librarian.utils import copy_and_close
63
 
from lp.archiveuploader.dscfile import findFile
64
 
from lp.archiveuploader.nascentuploadfile import UploadError
65
56
from lp.bugs.interfaces.bug import IBugSet
66
57
from lp.bugs.model.bug import Bug
67
58
from lp.bugs.model.bugattachment import BugAttachment
82
73
from lp.registry.model.person import Person
83
74
from lp.services.job.model.job import Job
84
75
from lp.services.log.logger import PrefixFilter
85
 
from lp.services.memcache.interfaces import IMemcacheClient
86
76
from lp.services.scripts.base import (
87
77
    LaunchpadCronScript,
 
78
    LOCK_PATH,
88
79
    SilentLaunchpadScriptFailure,
89
80
    )
90
81
from lp.services.session.model import SessionData
91
 
from lp.soyuz.model.files import SourcePackageReleaseFile
92
 
from lp.soyuz.model.sourcepackagerelease import SourcePackageRelease
93
82
from lp.translations.interfaces.potemplate import IPOTemplateSet
94
83
from lp.translations.model.potranslation import POTranslation
95
84
 
97
86
ONE_DAY_IN_SECONDS = 24*60*60
98
87
 
99
88
 
100
 
def subprocess_setup():
101
 
    signal.signal(signal.SIGPIPE, signal.SIG_DFL)
102
 
 
103
 
 
104
89
class BulkPruner(TunableLoop):
105
90
    """A abstract ITunableLoop base class for simple pruners.
106
91
 
839
824
        self.done = True
840
825
 
841
826
 
842
 
class PopulateSPRChangelogs(TunableLoop):
843
 
    maximum_chunk_size = 1
844
 
 
845
 
    def __init__(self, log, abort_time=None):
846
 
        super(PopulateSPRChangelogs, self).__init__(log, abort_time)
847
 
        value = getUtility(IMemcacheClient).get('populate-spr-changelogs')
848
 
        if not value:
849
 
            self.start_at = 0
850
 
        else:
851
 
            self.start_at = value
852
 
        self.finish_at = self.getCandidateSPRs(0).last()
853
 
 
854
 
    def getCandidateSPRs(self, start_at):
855
 
        return IMasterStore(SourcePackageRelease).using(
856
 
            SourcePackageRelease,
857
 
            # Find any SPRFs that have expired (LFA.content IS NULL).
858
 
            LeftJoin(
859
 
                SourcePackageReleaseFile,
860
 
                SourcePackageReleaseFile.sourcepackagereleaseID ==
861
 
                    SourcePackageRelease.id),
862
 
            LeftJoin(
863
 
                LibraryFileAlias,
864
 
                And(LibraryFileAlias.id ==
865
 
                    SourcePackageReleaseFile.libraryfileID,
866
 
                    LibraryFileAlias.content == None)),
867
 
            # And exclude any SPRs that have any expired SPRFs.
868
 
            ).find(
869
 
                SourcePackageRelease.id,
870
 
                SourcePackageRelease.id >= start_at,
871
 
                SourcePackageRelease.changelog == None,
872
 
            ).group_by(SourcePackageRelease.id).having(
873
 
                Count(LibraryFileAlias) == 0
874
 
            ).order_by(SourcePackageRelease.id)
875
 
 
876
 
    def isDone(self):
877
 
        return self.start_at > self.finish_at
878
 
 
879
 
    def __call__(self, chunk_size):
880
 
        for sprid in self.getCandidateSPRs(self.start_at)[:chunk_size]:
881
 
            spr = SourcePackageRelease.get(sprid)
882
 
            with TempDir() as tmp_dir:
883
 
                dsc_file = None
884
 
 
885
 
                # Grab the files from the librarian into a temporary
886
 
                # directory.
887
 
                try:
888
 
                    for sprf in spr.files:
889
 
                        dest = os.path.join(
890
 
                            tmp_dir.path, sprf.libraryfile.filename)
891
 
                        dest_file = open(dest, 'w')
892
 
                        sprf.libraryfile.open()
893
 
                        copy_and_close(sprf.libraryfile, dest_file)
894
 
                        if dest.endswith('.dsc'):
895
 
                            dsc_file = dest
896
 
                except LookupError:
897
 
                    self.log.warning(
898
 
                        'SPR %d (%s %s) has missing library files.' % (
899
 
                            spr.id, spr.name, spr.version))
900
 
                    continue
901
 
 
902
 
                if dsc_file is None:
903
 
                    self.log.warning(
904
 
                        'SPR %d (%s %s) has no DSC.' % (
905
 
                            spr.id, spr.name, spr.version))
906
 
                    continue
907
 
 
908
 
                # Extract the source package. Throw away stdout/stderr
909
 
                # -- we only really care about the return code.
910
 
                fnull = open('/dev/null', 'w')
911
 
                ret = subprocess.call(
912
 
                    ['dpkg-source', '-x', dsc_file, os.path.join(
913
 
                        tmp_dir.path, 'extracted')],
914
 
                        stdout=fnull, stderr=fnull,
915
 
                        preexec_fn=subprocess_setup)
916
 
                fnull.close()
917
 
                if ret != 0:
918
 
                    self.log.warning(
919
 
                        'SPR %d (%s %s) failed to unpack: returned %d' % (
920
 
                            spr.id, spr.name, spr.version, ret))
921
 
                    continue
922
 
 
923
 
                # We have an extracted source package. Let's get the
924
 
                # changelog. findFile ensures that it's not too huge, and
925
 
                # not a symlink.
926
 
                try:
927
 
                    changelog_path = findFile(
928
 
                        tmp_dir.path, 'debian/changelog')
929
 
                except UploadError, e:
930
 
                    changelog_path = None
931
 
                    self.log.warning(
932
 
                        'SPR %d (%s %s) changelog could not be '
933
 
                        'imported: %s' % (
934
 
                            spr.id, spr.name, spr.version, e))
935
 
                if changelog_path:
936
 
                    # The LFA should be restricted only if there aren't any
937
 
                    # public publications.
938
 
                    restricted = not any(
939
 
                        not a.private for a in spr.published_archives)
940
 
                    spr.changelog = getUtility(ILibraryFileAliasSet).create(
941
 
                        'changelog',
942
 
                        os.stat(changelog_path).st_size,
943
 
                        open(changelog_path, "r"),
944
 
                        "text/x-debian-source-changelog",
945
 
                        restricted=restricted)
946
 
                    self.log.info('SPR %d (%s %s) changelog imported.' % (
947
 
                        spr.id, spr.name, spr.version))
948
 
                else:
949
 
                    self.log.warning('SPR %d (%s %s) had no changelog.' % (
950
 
                        spr.id, spr.name, spr.version))
951
 
 
952
 
        self.start_at = spr.id + 1
953
 
        result = getUtility(IMemcacheClient).set(
954
 
            'populate-spr-changelogs', self.start_at)
955
 
        if not result:
956
 
            self.log.warning('Failed to set start_at in memcache.')
957
 
        transaction.commit()
958
 
 
959
 
 
960
827
class BaseDatabaseGarbageCollector(LaunchpadCronScript):
961
828
    """Abstract base class to run a collection of TunableLoops."""
962
829
    script_name = None # Script name for locking and database user. Override.
999
866
            % multiprocessing.cpu_count())
1000
867
 
1001
868
    def main(self):
1002
 
        start_time = time.time()
 
869
        self.start_time = time.time()
1003
870
 
1004
871
        # Stores the number of failed tasks.
1005
872
        self.failure_count = 0
1006
873
 
 
874
        # Copy the list so we can safely consume it.
 
875
        tunable_loops = list(self.tunable_loops)
1007
876
        if self.options.experimental:
1008
 
            tunable_loops = list(
1009
 
                self.tunable_loops + self.experimental_tunable_loops)
1010
 
        else:
1011
 
            tunable_loops = list(self.tunable_loops)
1012
 
 
1013
 
        a_very_long_time = float(31536000) # 1 year
1014
 
        abort_script = self.options.abort_script or a_very_long_time
1015
 
 
1016
 
        def worker():
1017
 
            self.logger.debug(
1018
 
                "Worker thread %s running.", threading.currentThread().name)
1019
 
            self.login()
1020
 
            while True:
1021
 
                if start_time + abort_script - time.time() <= 0:
1022
 
                    # Exit silently. We warn later.
1023
 
                    self.logger.debug(
1024
 
                        "Worker thread %s detected script timeout.",
1025
 
                        threading.currentThread().name)
1026
 
                    break
1027
 
 
1028
 
                num_remaining_tasks = len(tunable_loops)
1029
 
                if not num_remaining_tasks:
1030
 
                    break
1031
 
                tunable_loop_class = tunable_loops.pop(0)
1032
 
 
1033
 
                loop_name = tunable_loop_class.__name__
1034
 
 
1035
 
                # Configure logging for this loop to use a prefix. Log
1036
 
                # output from multiple threads will be interleaved, and
1037
 
                # this lets us tell log output from different tasks
1038
 
                # apart.
1039
 
                loop_logger = logging.getLogger('garbo.' + loop_name)
1040
 
                loop_logger.addFilter(PrefixFilter(loop_name))
1041
 
 
1042
 
                loop_logger.info("Running %s", loop_name)
1043
 
 
1044
 
                # How long until the script should abort.
1045
 
                remaining_script_time = (
1046
 
                    abort_script + start_time - time.time())
1047
 
 
1048
 
                # How long until the task should abort.
1049
 
                if self.options.abort_task is not None:
1050
 
                    # Task timeout specified on command line.
1051
 
                    abort_task = self.options.abort_task
1052
 
 
1053
 
                elif num_remaining_tasks <= self.options.threads:
1054
 
                    # We have a thread for every remaining task. Let the
1055
 
                    # task run until the script timeout.
1056
 
                    self.logger.debug2("Task may run until script timeout.")
1057
 
                    abort_task = remaining_script_time
1058
 
 
1059
 
                else:
1060
 
                    # Evenly distribute the remaining time to the
1061
 
                    # remaining tasks.
1062
 
                    abort_task = (
1063
 
                        self.options.threads
1064
 
                        * remaining_script_time / num_remaining_tasks)
1065
 
 
1066
 
                abort_time = min(abort_task, remaining_script_time)
1067
 
                self.logger.debug2(
1068
 
                    "Task will be terminated in %0.3f seconds", abort_time)
1069
 
 
1070
 
                tunable_loop = tunable_loop_class(
1071
 
                    abort_time=abort_time, log=loop_logger)
1072
 
 
1073
 
                if self._maximum_chunk_size is not None:
1074
 
                    tunable_loop.maximum_chunk_size = self._maximum_chunk_size
1075
 
 
1076
 
                try:
1077
 
                    tunable_loop.run()
1078
 
                    loop_logger.debug("%s completed sucessfully.", loop_name)
1079
 
                except Exception:
1080
 
                    loop_logger.exception("Unhandled exception")
1081
 
                    self.failure_count += 1
1082
 
                finally:
1083
 
                    transaction.abort()
 
877
            tunable_loops.extend(self.experimental_tunable_loops)
1084
878
 
1085
879
        threads = set()
1086
880
        for count in range(0, self.options.threads):
1087
881
            thread = threading.Thread(
1088
 
                target=worker, name='Worker-%d' % (count+1,))
 
882
                target=self.run_tasks_in_thread,
 
883
                name='Worker-%d' % (count+1,),
 
884
                args=(tunable_loops,))
1089
885
            thread.start()
1090
886
            threads.add(thread)
1091
887
 
1095
891
        # down when the script timeout is hit, and the extra time is to
1096
892
        # give them a chance to clean up.
1097
893
        for thread in threads:
1098
 
            time_to_go = start_time + abort_script - time.time() + 60
 
894
            time_to_go = self.get_remaining_script_time() + 60
1099
895
            if time_to_go > 0:
1100
896
                thread.join(time_to_go)
1101
897
            else:
1102
898
                break
1103
899
 
1104
900
        # If the script ran out of time, warn.
1105
 
        if start_time + abort_script - time.time() < 0:
 
901
        if self.get_remaining_script_time() < 0:
1106
902
            self.logger.warn(
1107
 
                "Script aborted after %d seconds.", abort_script)
 
903
                "Script aborted after %d seconds.", self.script_timeout)
 
904
 
 
905
        if tunable_loops:
 
906
            self.logger.warn("%d tasks did not run.", len(tunable_loops))
1108
907
 
1109
908
        if self.failure_count:
 
909
            self.logger.error("%d tasks failed.", self.failure_count)
1110
910
            raise SilentLaunchpadScriptFailure(self.failure_count)
1111
911
 
 
912
    def get_remaining_script_time(self):
 
913
        return self.start_time + self.script_timeout - time.time()
 
914
 
 
915
    @property
 
916
    def script_timeout(self):
 
917
        a_very_long_time = 31536000 # 1 year
 
918
        return self.options.abort_script or a_very_long_time
 
919
 
 
920
    def get_loop_logger(self, loop_name):
 
921
        """Retrieve a logger for use by a particular task.
 
922
 
 
923
        The logger will be configured to add the loop_name as a
 
924
        prefix to all log messages, making interleaved output from
 
925
        multiple threads somewhat readable.
 
926
        """
 
927
        loop_logger = logging.getLogger('garbo.' + loop_name)
 
928
        for filter in loop_logger.filters:
 
929
            if isinstance(filter, PrefixFilter):
 
930
                return loop_logger # Already have a PrefixFilter attached.
 
931
        loop_logger.addFilter(PrefixFilter(loop_name))
 
932
        return loop_logger
 
933
 
 
934
    def get_loop_abort_time(self, num_remaining_tasks):
 
935
        # How long until the task should abort.
 
936
        if self.options.abort_task is not None:
 
937
            # Task timeout specified on command line.
 
938
            abort_task = self.options.abort_task
 
939
 
 
940
        elif num_remaining_tasks <= self.options.threads:
 
941
            # We have a thread for every remaining task. Let
 
942
            # the task run until the script timeout.
 
943
            self.logger.debug2(
 
944
                "Task may run until script timeout.")
 
945
            abort_task = self.get_remaining_script_time()
 
946
 
 
947
        else:
 
948
            # Evenly distribute the remaining time to the
 
949
            # remaining tasks.
 
950
            abort_task = (
 
951
                self.options.threads
 
952
                * self.get_remaining_script_time() / num_remaining_tasks)
 
953
 
 
954
        return min(abort_task, self.get_remaining_script_time())
 
955
 
 
956
    def run_tasks_in_thread(self, tunable_loops):
 
957
        """Worker thread target to run tasks.
 
958
 
 
959
        Tasks are removed from tunable_loops and run one at a time,
 
960
        until all tasks that can be run have been run or the script
 
961
        has timed out.
 
962
        """
 
963
        self.logger.debug(
 
964
            "Worker thread %s running.", threading.currentThread().name)
 
965
        self.login()
 
966
 
 
967
        while True:
 
968
            # How long until the script should abort.
 
969
            if self.get_remaining_script_time() <= 0:
 
970
                # Exit silently. We warn later.
 
971
                self.logger.debug(
 
972
                    "Worker thread %s detected script timeout.",
 
973
                    threading.currentThread().name)
 
974
                break
 
975
 
 
976
            num_remaining_tasks = len(tunable_loops)
 
977
            if not num_remaining_tasks:
 
978
                break
 
979
            tunable_loop_class = tunable_loops.pop(0)
 
980
 
 
981
            loop_name = tunable_loop_class.__name__
 
982
 
 
983
            loop_logger = self.get_loop_logger(loop_name)
 
984
 
 
985
            # Aquire a lock for the task. Multiple garbo processes
 
986
            # might be running simultaneously.
 
987
            loop_lock_path = os.path.join(
 
988
                LOCK_PATH, 'launchpad-garbo-%s.lock' % loop_name)
 
989
            # No logger - too noisy, so report issues ourself.
 
990
            loop_lock = GlobalLock(loop_lock_path, logger=None)
 
991
            try:
 
992
                loop_lock.acquire()
 
993
                loop_logger.debug("Aquired lock %s.", loop_lock_path)
 
994
            except LockAlreadyAcquired:
 
995
                # If the lock cannot be acquired, but we have plenty
 
996
                # of time remaining, just put the task back to the
 
997
                # end of the queue.
 
998
                if self.get_remaining_script_time() > 60:
 
999
                    loop_logger.debug3(
 
1000
                        "Unable to acquire lock %s. Running elsewhere?",
 
1001
                        loop_lock_path)
 
1002
                    time.sleep(0.3) # Avoid spinning.
 
1003
                    tunable_loops.append(tunable_loop_class)
 
1004
                # Otherwise, emit a warning and skip the task.
 
1005
                else:
 
1006
                    loop_logger.warn(
 
1007
                        "Unable to acquire lock %s. Running elsewhere?",
 
1008
                        loop_lock_path)
 
1009
                continue
 
1010
 
 
1011
            try:
 
1012
                loop_logger.info("Running %s", loop_name)
 
1013
 
 
1014
                abort_time = self.get_loop_abort_time(num_remaining_tasks)
 
1015
                loop_logger.debug2(
 
1016
                    "Task will be terminated in %0.3f seconds",
 
1017
                    abort_time)
 
1018
 
 
1019
                tunable_loop = tunable_loop_class(
 
1020
                    abort_time=abort_time, log=loop_logger)
 
1021
 
 
1022
                # Allow the test suite to override the chunk size.
 
1023
                if self._maximum_chunk_size is not None:
 
1024
                    tunable_loop.maximum_chunk_size = (
 
1025
                        self._maximum_chunk_size)
 
1026
 
 
1027
                try:
 
1028
                    tunable_loop.run()
 
1029
                    loop_logger.debug(
 
1030
                        "%s completed sucessfully.", loop_name)
 
1031
                except Exception:
 
1032
                    loop_logger.exception("Unhandled exception")
 
1033
                    self.failure_count += 1
 
1034
 
 
1035
            finally:
 
1036
                loop_lock.release()
 
1037
                loop_logger.debug("Released lock %s.", loop_lock_path)
 
1038
                transaction.abort()
 
1039
 
1112
1040
 
1113
1041
class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
1114
1042
    script_name = 'garbo-hourly'
1122
1050
        UnusedSessionPruner,
1123
1051
        DuplicateSessionPruner,
1124
1052
        BugHeatUpdater,
1125
 
        PopulateSPRChangelogs,
1126
1053
        ]
1127
1054
    experimental_tunable_loops = []
1128
1055