~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/scripts/garbo.py

Fix lint.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
 
1
# Copyright 2009-2010 Canonical Ltd.  This software is licensed under the
2
2
# GNU Affero General Public License version 3 (see the file LICENSE).
3
3
 
4
4
"""Database garbage collection."""
14
14
    timedelta,
15
15
    )
16
16
import logging
17
 
import multiprocessing
18
17
import os
19
18
import threading
20
19
import time
23
22
    GlobalLock,
24
23
    LockAlreadyAcquired,
25
24
    )
 
25
import multiprocessing
26
26
from psycopg2 import IntegrityError
27
27
import pytz
28
 
from storm.expr import In
29
28
from storm.locals import (
30
29
    Max,
31
30
    Min,
37
36
 
38
37
from canonical.config import config
39
38
from canonical.database import postgresql
40
 
from canonical.database.constants import UTC_NOW
41
39
from canonical.database.sqlbase import (
42
40
    cursor,
43
41
    session_store,
44
42
    sqlvalues,
45
43
    )
 
44
from canonical.launchpad.database.emailaddress import EmailAddress
 
45
from canonical.launchpad.database.librarian import TimeLimitedToken
 
46
from canonical.launchpad.database.oauth import OAuthNonce
 
47
from canonical.launchpad.database.openidconsumer import OpenIDConsumerNonce
 
48
from canonical.launchpad.interfaces.emailaddress import EmailAddressStatus
 
49
from canonical.launchpad.interfaces.lpstorm import IMasterStore
 
50
from canonical.launchpad.utilities.looptuner import TunableLoop
46
51
from canonical.launchpad.webapp.interfaces import (
47
52
    IStoreSelector,
48
53
    MAIN_STORE,
49
54
    MASTER_FLAVOR,
50
55
    )
51
 
from lp.answers.model.answercontact import AnswerContact
52
56
from lp.bugs.interfaces.bug import IBugSet
53
57
from lp.bugs.model.bug import Bug
54
58
from lp.bugs.model.bugattachment import BugAttachment
 
59
from lp.bugs.model.bugmessage import BugMessage
55
60
from lp.bugs.model.bugnotification import BugNotification
56
61
from lp.bugs.model.bugwatch import BugWatchActivity
57
62
from lp.bugs.scripts.checkwatches.scheduler import (
67
72
    )
68
73
from lp.hardwaredb.model.hwdb import HWSubmission
69
74
from lp.registry.model.person import Person
70
 
from lp.services.database.lpstorm import IMasterStore
71
 
from lp.services.identity.interfaces.account import AccountStatus
72
 
from lp.services.identity.interfaces.emailaddress import EmailAddressStatus
73
 
from lp.services.identity.model.emailaddress import EmailAddress
74
75
from lp.services.job.model.job import Job
75
 
from lp.services.librarian.model import TimeLimitedToken
76
76
from lp.services.log.logger import PrefixFilter
77
 
from lp.services.looptuner import TunableLoop
78
 
from lp.services.oauth.model import OAuthNonce
79
 
from lp.services.openid.model.openidconsumer import OpenIDConsumerNonce
80
 
from lp.services.propertycache import cachedproperty
81
77
from lp.services.scripts.base import (
82
78
    LaunchpadCronScript,
83
79
    LOCK_PATH,
84
80
    SilentLaunchpadScriptFailure,
85
81
    )
86
82
from lp.services.session.model import SessionData
87
 
from lp.services.verification.model.logintoken import LoginToken
88
83
from lp.translations.interfaces.potemplate import IPOTemplateSet
89
 
from lp.translations.model.potmsgset import POTMsgSet
90
84
from lp.translations.model.potranslation import POTranslation
91
 
from lp.translations.model.translationmessage import TranslationMessage
92
 
from lp.translations.model.translationtemplateitem import (
93
 
    TranslationTemplateItem,
94
 
    )
95
 
 
96
 
 
97
 
ONE_DAY_IN_SECONDS = 24 * 60 * 60
 
85
 
 
86
 
 
87
ONE_DAY_IN_SECONDS = 24*60*60
98
88
 
99
89
 
100
90
class BulkPruner(TunableLoop):
189
179
        self.store.execute("CLOSE %s" % self.cursor_name)
190
180
 
191
181
 
192
 
class LoginTokenPruner(BulkPruner):
193
 
    """Remove old LoginToken rows.
194
 
 
195
 
    After 1 year, they are useless even for archaeology.
196
 
    """
197
 
    target_table_class = LoginToken
198
 
    ids_to_prune_query = """
199
 
        SELECT id FROM LoginToken WHERE
200
 
        created < CURRENT_TIMESTAMP - CAST('1 year' AS interval)
201
 
        """
202
 
 
203
 
 
204
182
class POTranslationPruner(BulkPruner):
205
183
    """Remove unlinked POTranslation entries.
206
184
 
210
188
    ids_to_prune_query = """
211
189
        SELECT POTranslation.id AS id FROM POTranslation
212
190
        EXCEPT (
213
 
            SELECT msgstr0 FROM TranslationMessage
 
191
            SELECT potranslation FROM POComment
 
192
 
 
193
            UNION ALL SELECT msgstr0 FROM TranslationMessage
214
194
                WHERE msgstr0 IS NOT NULL
215
195
 
216
196
            UNION ALL SELECT msgstr1 FROM TranslationMessage
310
290
        """
311
291
 
312
292
 
313
 
class BugSummaryJournalRollup(TunableLoop):
314
 
    """Rollup BugSummaryJournal rows into BugSummary."""
315
 
    maximum_chunk_size = 5000
316
 
 
317
 
    def __init__(self, log, abort_time=None):
318
 
        super(BugSummaryJournalRollup, self).__init__(log, abort_time)
319
 
        self.store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR)
320
 
 
321
 
    def isDone(self):
322
 
        has_more = self.store.execute(
323
 
            "SELECT EXISTS (SELECT TRUE FROM BugSummaryJournal LIMIT 1)"
324
 
            ).get_one()[0]
325
 
        return not has_more
326
 
 
327
 
    def __call__(self, chunk_size):
328
 
        chunk_size = int(chunk_size + 0.5)
329
 
        self.store.execute(
330
 
            "SELECT bugsummary_rollup_journal(%s)", (chunk_size,),
331
 
            noresult=True)
332
 
        self.store.commit()
333
 
 
334
 
 
335
293
class OpenIDConsumerNoncePruner(TunableLoop):
336
294
    """An ITunableLoop to prune old OpenIDConsumerNonce records.
337
295
 
338
296
    We remove all OpenIDConsumerNonce records older than 1 day.
339
297
    """
340
 
    maximum_chunk_size = 6 * 60 * 60  # 6 hours in seconds.
 
298
    maximum_chunk_size = 6*60*60 # 6 hours in seconds.
341
299
 
342
300
    def __init__(self, log, abort_time=None):
343
301
        super(OpenIDConsumerNoncePruner, self).__init__(log, abort_time)
643
601
        self.max_offset = self.store.execute(
644
602
            "SELECT MAX(id) FROM UnlinkedPeople").get_one()[0]
645
603
        if self.max_offset is None:
646
 
            self.max_offset = -1  # Trigger isDone() now.
 
604
            self.max_offset = -1 # Trigger isDone() now.
647
605
            self.log.debug("No Person records to remove.")
648
606
        else:
649
607
            self.log.info("%d Person records to remove." % self.max_offset)
709
667
        """
710
668
 
711
669
 
712
 
class AnswerContactPruner(BulkPruner):
713
 
    """Remove old answer contacts which are no longer required.
714
 
 
715
 
    Remove a person as an answer contact if:
716
 
      their account has been deactivated for more than one day, or
717
 
      suspended for more than one week.
718
 
    """
719
 
    target_table_class = AnswerContact
720
 
    ids_to_prune_query = """
721
 
        SELECT DISTINCT AnswerContact.id
722
 
        FROM AnswerContact, Person, Account
723
 
        WHERE
724
 
            AnswerContact.person = Person.id
725
 
            AND Person.account = Account.id
726
 
            AND (
727
 
                (Account.date_status_set <
728
 
                CURRENT_TIMESTAMP AT TIME ZONE 'UTC'
729
 
                - CAST('1 day' AS interval)
730
 
                AND Account.status = %s)
731
 
                OR
732
 
                (Account.date_status_set <
733
 
                CURRENT_TIMESTAMP AT TIME ZONE 'UTC'
734
 
                - CAST('7 days' AS interval)
735
 
                AND Account.status = %s)
736
 
            )
737
 
        """ % (AccountStatus.DEACTIVATED.value, AccountStatus.SUSPENDED.value)
738
 
 
739
 
 
740
670
class BranchJobPruner(BulkPruner):
741
671
    """Prune `BranchJob`s that are in a final state and more than a month old.
742
672
 
754
684
        """
755
685
 
756
686
 
 
687
class MirrorBugMessageOwner(TunableLoop):
 
688
    """Mirror BugMessage.owner from Message.
 
689
 
 
690
    Only needed until they are all set, after that triggers will maintain it.
 
691
    """
 
692
 
 
693
    # Test migration did 3M in 2 hours, so 5000 is ~ 10 seconds - and thats the
 
694
    # max we want to hold a DB lock open for.
 
695
    minimum_chunk_size = 1000
 
696
    maximum_chunk_size = 5000
 
697
 
 
698
    def __init__(self, log, abort_time=None):
 
699
        super(MirrorBugMessageOwner, self).__init__(log, abort_time)
 
700
        self.store = IMasterStore(BugMessage)
 
701
        self.isDone = IMasterStore(BugMessage).find(
 
702
            BugMessage, BugMessage.ownerID==None).is_empty
 
703
 
 
704
    def __call__(self, chunk_size):
 
705
        """See `ITunableLoop`."""
 
706
        transaction.begin()
 
707
        updated = self.store.execute("""update bugmessage set
 
708
            owner=message.owner from message where
 
709
            bugmessage.message=message.id and bugmessage.id in
 
710
                (select id from bugmessage where owner is NULL limit %s);"""
 
711
            % int(chunk_size)
 
712
            ).rowcount
 
713
        self.log.debug("Updated %s bugmessages." % updated)
 
714
        transaction.commit()
 
715
 
 
716
 
757
717
class BugHeatUpdater(TunableLoop):
758
718
    """A `TunableLoop` for bug heat calculations."""
759
719
 
760
 
    maximum_chunk_size = 5000
 
720
    maximum_chunk_size = 1000
761
721
 
762
722
    def __init__(self, log, abort_time=None, max_heat_age=None):
763
723
        super(BugHeatUpdater, self).__init__(log, abort_time)
791
751
 
792
752
        See `ITunableLoop`.
793
753
        """
794
 
        chunk_size = int(chunk_size + 0.5)
 
754
        # We multiply chunk_size by 1000 for the sake of doing updates
 
755
        # quickly.
 
756
        chunk_size = int(chunk_size * 1000)
 
757
 
 
758
        transaction.begin()
795
759
        outdated_bugs = self._outdated_bugs[:chunk_size]
796
 
        # We don't use outdated_bugs.set() here to work around
797
 
        # Storm Bug #820290.
798
 
        outdated_bug_ids = [bug.id for bug in outdated_bugs]
799
 
        self.log.debug("Updating heat for %s bugs", len(outdated_bug_ids))
800
 
        IMasterStore(Bug).find(
801
 
            Bug, Bug.id.is_in(outdated_bug_ids)).set(
802
 
                heat=SQL('calculate_bug_heat(Bug.id)'),
803
 
                heat_last_updated=UTC_NOW)
 
760
        self.log.debug("Updating heat for %s bugs" % outdated_bugs.count())
 
761
        outdated_bugs.set(
 
762
            heat=SQL('calculate_bug_heat(Bug.id)'),
 
763
            heat_last_updated=datetime.now(pytz.utc))
 
764
 
804
765
        transaction.commit()
805
766
 
806
767
 
841
802
class OldTimeLimitedTokenDeleter(TunableLoop):
842
803
    """Delete expired url access tokens from the session DB."""
843
804
 
844
 
    maximum_chunk_size = 24 * 60 * 60  # 24 hours in seconds.
 
805
    maximum_chunk_size = 24*60*60 # 24 hours in seconds.
845
806
 
846
807
    def __init__(self, log, abort_time=None):
847
808
        super(OldTimeLimitedTokenDeleter, self).__init__(log, abort_time)
898
859
        self.done = True
899
860
 
900
861
 
901
 
class UnusedPOTMsgSetPruner(TunableLoop):
902
 
    """Cleans up unused POTMsgSets."""
903
 
 
904
 
    done = False
905
 
    offset = 0
906
 
    maximum_chunk_size = 50000
907
 
 
908
 
    def isDone(self):
909
 
        """See `TunableLoop`."""
910
 
        return self.offset >= len(self.msgset_ids_to_remove)
911
 
 
912
 
    @cachedproperty
913
 
    def msgset_ids_to_remove(self):
914
 
        """Return the IDs of the POTMsgSets to remove."""
915
 
        query = """
916
 
            -- Get all POTMsgSet IDs which are obsolete (sequence == 0)
917
 
            -- and are not used (sequence != 0) in any other template.
918
 
            SELECT POTMsgSet
919
 
              FROM TranslationTemplateItem tti
920
 
              WHERE sequence=0 AND
921
 
              NOT EXISTS(
922
 
                SELECT id
923
 
                  FROM TranslationTemplateItem
924
 
                  WHERE potmsgset = tti.potmsgset AND sequence != 0)
925
 
            UNION
926
 
            -- Get all POTMsgSet IDs which are not referenced
927
 
            -- by any of the templates (they must have TTI rows for that).
928
 
            (SELECT POTMsgSet.id
929
 
               FROM POTMsgSet
930
 
               LEFT OUTER JOIN TranslationTemplateItem
931
 
                 ON TranslationTemplateItem.potmsgset = POTMsgSet.id
932
 
               WHERE
933
 
                 TranslationTemplateItem.potmsgset IS NULL);
934
 
            """
935
 
        store = IMasterStore(POTMsgSet)
936
 
        results = store.execute(query)
937
 
        ids_to_remove = [id for (id,) in results.get_all()]
938
 
        return ids_to_remove
939
 
 
940
 
    def __call__(self, chunk_size):
941
 
        """See `TunableLoop`."""
942
 
        # We cast chunk_size to an int to avoid issues with slicing
943
 
        # (DBLoopTuner passes in a float).
944
 
        chunk_size = int(chunk_size)
945
 
        msgset_ids_to_remove = (
946
 
            self.msgset_ids_to_remove[self.offset:][:chunk_size])
947
 
        # Remove related TranslationTemplateItems.
948
 
        store = IMasterStore(POTMsgSet)
949
 
        related_ttis = store.find(
950
 
            TranslationTemplateItem,
951
 
            In(TranslationTemplateItem.potmsgsetID, msgset_ids_to_remove))
952
 
        related_ttis.remove()
953
 
        # Remove related TranslationMessages.
954
 
        related_translation_messages = store.find(
955
 
            TranslationMessage,
956
 
            In(TranslationMessage.potmsgsetID, msgset_ids_to_remove))
957
 
        related_translation_messages.remove()
958
 
        store.find(
959
 
            POTMsgSet, In(POTMsgSet.id, msgset_ids_to_remove)).remove()
960
 
        self.offset = self.offset + chunk_size
961
 
        transaction.commit()
962
 
 
963
 
 
964
862
class BaseDatabaseGarbageCollector(LaunchpadCronScript):
965
863
    """Abstract base class to run a collection of TunableLoops."""
966
 
    script_name = None  # Script name for locking and database user. Override.
967
 
    tunable_loops = None  # Collection of TunableLoops. Override.
968
 
    continue_on_failure = False  # If True, an exception in a tunable loop
969
 
                                 # does not cause the script to abort.
 
864
    script_name = None # Script name for locking and database user. Override.
 
865
    tunable_loops = None # Collection of TunableLoops. Override.
 
866
    continue_on_failure = False # If True, an exception in a tunable loop
 
867
                                # does not cause the script to abort.
970
868
 
971
869
    # Default run time of the script in seconds. Override.
972
870
    default_abort_script_time = None
1017
915
        for count in range(0, self.options.threads):
1018
916
            thread = threading.Thread(
1019
917
                target=self.run_tasks_in_thread,
1020
 
                name='Worker-%d' % (count + 1,),
 
918
                name='Worker-%d' % (count+1,),
1021
919
                args=(tunable_loops,))
1022
920
            thread.start()
1023
921
            threads.add(thread)
1051
949
 
1052
950
    @property
1053
951
    def script_timeout(self):
1054
 
        a_very_long_time = 31536000  # 1 year
 
952
        a_very_long_time = 31536000 # 1 year
1055
953
        return self.options.abort_script or a_very_long_time
1056
954
 
1057
955
    def get_loop_logger(self, loop_name):
1064
962
        loop_logger = logging.getLogger('garbo.' + loop_name)
1065
963
        for filter in loop_logger.filters:
1066
964
            if isinstance(filter, PrefixFilter):
1067
 
                return loop_logger  # Already have a PrefixFilter attached.
 
965
                return loop_logger # Already have a PrefixFilter attached.
1068
966
        loop_logger.addFilter(PrefixFilter(loop_name))
1069
967
        return loop_logger
1070
968
 
1136
1034
                    loop_logger.debug3(
1137
1035
                        "Unable to acquire lock %s. Running elsewhere?",
1138
1036
                        loop_lock_path)
1139
 
                    time.sleep(0.3)  # Avoid spinning.
 
1037
                    time.sleep(0.3) # Avoid spinning.
1140
1038
                    tunable_loops.append(tunable_loop_class)
1141
1039
                # Otherwise, emit a warning and skip the task.
1142
1040
                else:
1175
1073
                transaction.abort()
1176
1074
 
1177
1075
 
1178
 
class FrequentDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
1179
 
    """Run every 5 minutes.
1180
 
 
1181
 
    This may become even more frequent in the future.
1182
 
 
1183
 
    Jobs with low overhead can go here to distribute work more evenly.
1184
 
    """
1185
 
    script_name = 'garbo-frequently'
 
1076
class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
 
1077
    script_name = 'garbo-hourly'
1186
1078
    tunable_loops = [
1187
 
        BugSummaryJournalRollup,
 
1079
        MirrorBugMessageOwner,
1188
1080
        OAuthNoncePruner,
1189
1081
        OpenIDConsumerNoncePruner,
1190
1082
        OpenIDConsumerAssociationPruner,
1191
 
        AntiqueSessionPruner,
1192
 
        ]
1193
 
    experimental_tunable_loops = []
1194
 
 
1195
 
    # 5 minmutes minus 20 seconds for cleanup. This helps ensure the
1196
 
    # script is fully terminated before the next scheduled hourly run
1197
 
    # kicks in.
1198
 
    default_abort_script_time = 60 * 5 - 20
1199
 
 
1200
 
 
1201
 
class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
1202
 
    """Run every hour.
1203
 
 
1204
 
    Jobs we want to run fairly often but have noticable overhead go here.
1205
 
    """
1206
 
    script_name = 'garbo-hourly'
1207
 
    tunable_loops = [
1208
1083
        RevisionCachePruner,
1209
1084
        BugWatchScheduler,
 
1085
        AntiqueSessionPruner,
1210
1086
        UnusedSessionPruner,
1211
1087
        DuplicateSessionPruner,
1212
1088
        BugHeatUpdater,
1219
1095
 
1220
1096
 
1221
1097
class DailyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
1222
 
    """Run every day.
1223
 
 
1224
 
    Jobs that don't need to be run frequently.
1225
 
 
1226
 
    If there is low overhead, consider putting these tasks in more
1227
 
    frequently invoked lists to distribute the work more evenly.
1228
 
    """
1229
1098
    script_name = 'garbo-daily'
1230
1099
    tunable_loops = [
1231
 
        AnswerContactPruner,
1232
1100
        BranchJobPruner,
1233
1101
        BugNotificationPruner,
1234
1102
        BugWatchActivityPruner,
1235
1103
        CodeImportEventPruner,
1236
1104
        CodeImportResultPruner,
1237
1105
        HWSubmissionEmailLinker,
1238
 
        LoginTokenPruner,
1239
1106
        ObsoleteBugAttachmentPruner,
1240
1107
        OldTimeLimitedTokenDeleter,
1241
1108
        RevisionAuthorEmailLinker,
1242
1109
        SuggestiveTemplatesCacheUpdater,
1243
1110
        POTranslationPruner,
1244
 
        UnusedPOTMsgSetPruner,
1245
1111
        ]
1246
1112
    experimental_tunable_loops = [
1247
1113
        PersonPruner,