~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/scripts/garbo.py

  • Committer: Launchpad Patch Queue Manager
  • Date: 2011-08-05 16:26:42 UTC
  • mfrom: (13588.1.17 local-latency-port)
  • Revision ID: launchpad@pqm.canonical.com-20110805162642-hgal6meu10l3utts
[r=henninge][bug=821482][no-qa] Add --port option.

Show diffs side-by-side

added added

removed removed

Lines of Context:
49
49
from canonical.launchpad.database.librarian import TimeLimitedToken
50
50
from canonical.launchpad.database.oauth import OAuthNonce
51
51
from canonical.launchpad.database.openidconsumer import OpenIDConsumerNonce
52
 
from canonical.launchpad.interfaces.account import AccountStatus
53
52
from canonical.launchpad.interfaces.emailaddress import EmailAddressStatus
54
53
from canonical.launchpad.interfaces.lpstorm import IMasterStore
55
54
from canonical.launchpad.utilities.looptuner import TunableLoop
58
57
    MAIN_STORE,
59
58
    MASTER_FLAVOR,
60
59
    )
61
 
from lp.answers.model.answercontact import AnswerContact
62
60
from lp.bugs.interfaces.bug import IBugSet
63
61
from lp.bugs.model.bug import Bug
64
62
from lp.bugs.model.bugattachment import BugAttachment
96
94
    )
97
95
 
98
96
 
99
 
ONE_DAY_IN_SECONDS = 24 * 60 * 60
 
97
ONE_DAY_IN_SECONDS = 24*60*60
100
98
 
101
99
 
102
100
class BulkPruner(TunableLoop):
307
305
 
308
306
    We remove all OpenIDConsumerNonce records older than 1 day.
309
307
    """
310
 
    maximum_chunk_size = 6 * 60 * 60  # 6 hours in seconds.
 
308
    maximum_chunk_size = 6*60*60 # 6 hours in seconds.
311
309
 
312
310
    def __init__(self, log, abort_time=None):
313
311
        super(OpenIDConsumerNoncePruner, self).__init__(log, abort_time)
613
611
        self.max_offset = self.store.execute(
614
612
            "SELECT MAX(id) FROM UnlinkedPeople").get_one()[0]
615
613
        if self.max_offset is None:
616
 
            self.max_offset = -1  # Trigger isDone() now.
 
614
            self.max_offset = -1 # Trigger isDone() now.
617
615
            self.log.debug("No Person records to remove.")
618
616
        else:
619
617
            self.log.info("%d Person records to remove." % self.max_offset)
679
677
        """
680
678
 
681
679
 
682
 
class AnswerContactPruner(BulkPruner):
683
 
    """Remove old answer contacts which are no longer required.
684
 
 
685
 
    Remove a person as an answer contact if:
686
 
      their account has been deactivated for more than one day, or
687
 
      suspended for more than one week.
688
 
    """
689
 
    target_table_class = AnswerContact
690
 
    ids_to_prune_query = """
691
 
        SELECT DISTINCT AnswerContact.id
692
 
        FROM AnswerContact, Person, Account
693
 
        WHERE
694
 
            AnswerContact.person = Person.id
695
 
            AND Person.account = Account.id
696
 
            AND (
697
 
                (Account.date_status_set <
698
 
                CURRENT_TIMESTAMP AT TIME ZONE 'UTC'
699
 
                - CAST('1 day' AS interval)
700
 
                AND Account.status = %s)
701
 
                OR
702
 
                (Account.date_status_set <
703
 
                CURRENT_TIMESTAMP AT TIME ZONE 'UTC'
704
 
                - CAST('7 days' AS interval)
705
 
                AND Account.status = %s)
706
 
            )
707
 
        """ % (AccountStatus.DEACTIVATED.value, AccountStatus.SUSPENDED.value)
708
 
 
709
 
 
710
680
class BranchJobPruner(BulkPruner):
711
681
    """Prune `BranchJob`s that are in a final state and more than a month old.
712
682
 
730
700
    Only needed until they are all set, after that triggers will maintain it.
731
701
    """
732
702
 
733
 
    # Test migration did 3M in 2 hours, so 5000 is ~ 10 seconds - and that's
734
 
    # the max we want to hold a DB lock open for.
 
703
    # Test migration did 3M in 2 hours, so 5000 is ~ 10 seconds - and thats the
 
704
    # max we want to hold a DB lock open for.
735
705
    minimum_chunk_size = 1000
736
706
    maximum_chunk_size = 5000
737
707
 
739
709
        super(MirrorBugMessageOwner, self).__init__(log, abort_time)
740
710
        self.store = IMasterStore(BugMessage)
741
711
        self.isDone = IMasterStore(BugMessage).find(
742
 
            BugMessage, BugMessage.ownerID == None).is_empty
 
712
            BugMessage, BugMessage.ownerID==None).is_empty
743
713
 
744
714
    def __call__(self, chunk_size):
745
715
        """See `ITunableLoop`."""
841
811
class OldTimeLimitedTokenDeleter(TunableLoop):
842
812
    """Delete expired url access tokens from the session DB."""
843
813
 
844
 
    maximum_chunk_size = 24 * 60 * 60  # 24 hours in seconds.
 
814
    maximum_chunk_size = 24*60*60 # 24 hours in seconds.
845
815
 
846
816
    def __init__(self, log, abort_time=None):
847
817
        super(OldTimeLimitedTokenDeleter, self).__init__(log, abort_time)
963
933
 
964
934
class BaseDatabaseGarbageCollector(LaunchpadCronScript):
965
935
    """Abstract base class to run a collection of TunableLoops."""
966
 
    script_name = None  # Script name for locking and database user. Override.
967
 
    tunable_loops = None  # Collection of TunableLoops. Override.
968
 
    continue_on_failure = False  # If True, an exception in a tunable loop
969
 
                                 # does not cause the script to abort.
 
936
    script_name = None # Script name for locking and database user. Override.
 
937
    tunable_loops = None # Collection of TunableLoops. Override.
 
938
    continue_on_failure = False # If True, an exception in a tunable loop
 
939
                                # does not cause the script to abort.
970
940
 
971
941
    # Default run time of the script in seconds. Override.
972
942
    default_abort_script_time = None
1017
987
        for count in range(0, self.options.threads):
1018
988
            thread = threading.Thread(
1019
989
                target=self.run_tasks_in_thread,
1020
 
                name='Worker-%d' % (count + 1,),
 
990
                name='Worker-%d' % (count+1,),
1021
991
                args=(tunable_loops,))
1022
992
            thread.start()
1023
993
            threads.add(thread)
1051
1021
 
1052
1022
    @property
1053
1023
    def script_timeout(self):
1054
 
        a_very_long_time = 31536000  # 1 year
 
1024
        a_very_long_time = 31536000 # 1 year
1055
1025
        return self.options.abort_script or a_very_long_time
1056
1026
 
1057
1027
    def get_loop_logger(self, loop_name):
1064
1034
        loop_logger = logging.getLogger('garbo.' + loop_name)
1065
1035
        for filter in loop_logger.filters:
1066
1036
            if isinstance(filter, PrefixFilter):
1067
 
                return loop_logger  # Already have a PrefixFilter attached.
 
1037
                return loop_logger # Already have a PrefixFilter attached.
1068
1038
        loop_logger.addFilter(PrefixFilter(loop_name))
1069
1039
        return loop_logger
1070
1040
 
1136
1106
                    loop_logger.debug3(
1137
1107
                        "Unable to acquire lock %s. Running elsewhere?",
1138
1108
                        loop_lock_path)
1139
 
                    time.sleep(0.3)  # Avoid spinning.
 
1109
                    time.sleep(0.3) # Avoid spinning.
1140
1110
                    tunable_loops.append(tunable_loop_class)
1141
1111
                # Otherwise, emit a warning and skip the task.
1142
1112
                else:
1199
1169
class DailyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
1200
1170
    script_name = 'garbo-daily'
1201
1171
    tunable_loops = [
1202
 
        AnswerContactPruner,
1203
1172
        BranchJobPruner,
1204
1173
        BugNotificationPruner,
1205
1174
        BugWatchActivityPruner,