~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
#!/usr/bin/python -S
#
# Copyright 2009 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

# pylint: disable-msg=C0103,W0403

import _pythonpath

from zope.component import getUtility

from canonical.config import config
from canonical.database.sqlbase import (
    ISOLATION_LEVEL_AUTOCOMMIT, flush_database_updates)
from lp.app.errors import NotFoundError
from lp.registry.interfaces.karma import IKarmaCacheManager
from lp.services.scripts.base import LaunchpadCronScript


class KarmaCacheUpdater(LaunchpadCronScript):
    def main(self):
        """Update the KarmaCache table for all valid Launchpad users.

        For each Launchpad user with a preferred email address, calculate his
        karmavalue for each category of actions we have and update his entry
        in the KarmaCache table. If a user doesn't have an entry for that
        category in KarmaCache a new one will be created.

        Entries in the KarmaTotalCache table will also be created/updated for
        each user which has entries in the KarmaCache table. Any user which
        doesn't have any entries in the KarmaCache table has its entries
        removed from the KarmaTotalCache table as well.
        """
        self.logger.info("Updating Launchpad karma caches")

        # We use the autocommit transaction isolation level to minimize
        # contention. It also allows us to not bother explicitly calling
        # COMMIT all the time. However, if we interrupt this script mid-run
        # it will need to be re-run as the data will be inconsistent (only
        # part of the caches will have been recalculated).
        self.txn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

        self.cur = self.txn.conn().cursor()
        self.karmacachemanager = getUtility(IKarmaCacheManager)

        # This method ordering needs to be preserved. In particular,
        # C_add_summed_totals method is called last because we don't want to
        # include the values added in our calculation in A_update_karmacache.
        self.A_update_karmacache()
        self.B_update_karmatotalcache()
        self.C_add_karmacache_sums()

        self.logger.info("Finished updating Launchpad karma caches")

    def A_update_karmacache(self):
        self.logger.info("Step A: Calculating individual KarmaCache entries")

        # Calculate everyones karma. Karma degrades each day, becoming
        # worthless after karma_expires_after. This query produces odd results
        # when datecreated is in the future, but there is really no point
        # adding the extra WHEN clause.
        karma_expires_after = '1 year'
        self.cur.execute("""
            SELECT person, category, product, distribution,
                ROUND(SUM(
                CASE WHEN karma.datecreated + %s::interval
                    <= CURRENT_TIMESTAMP AT TIME ZONE 'UTC' THEN 0
                ELSE points * (1 - extract(
                    EPOCH FROM CURRENT_TIMESTAMP AT TIME ZONE 'UTC' -
                    karma.datecreated
                    ) / extract(EPOCH FROM %s::interval))
                END
                ))
            FROM Karma
            JOIN KarmaAction ON action = KarmaAction.id
            GROUP BY person, category, product, distribution
            """, (karma_expires_after, karma_expires_after))

        # Suck into RAM to avoid tieing up resources on the DB.
        results = list(self.cur.fetchall())
        self.logger.debug("Got %d (person, category) scores", len(results))

        # Note that we don't need to commit each iteration because we are
        # running in autocommit mode.
        scaling = self.calculate_scaling(results)
        for entry in results:
            self.update_one_karma_cache_entry(entry, scaling)
        flush_database_updates()

        # Delete the entries we're going to replace.
        self.cur.execute("DELETE FROM KarmaCache WHERE category IS NULL")
        self.cur.execute("""
            DELETE FROM KarmaCache
            WHERE project IS NOT NULL AND product IS NULL""")
        self.cur.execute("""
            DELETE FROM KarmaCache
            WHERE category IS NOT NULL AND project IS NULL AND product IS NULL
                  AND distribution IS NULL AND sourcepackagename IS NULL""")

        # Don't allow our table to bloat with inactive users.
        self.cur.execute("DELETE FROM KarmaCache WHERE karmavalue <= 0")

        # VACUUM KarmaCache since we have just touched every record in it.
        self.cur.execute("""VACUUM KarmaCache""")

    def B_update_karmatotalcache(self):
        self.logger.info("Step B: Rebuilding KarmaTotalCache")
        # Trash old records
        self.cur.execute("""
            DELETE FROM KarmaTotalCache
            WHERE person NOT IN (SELECT person FROM KarmaCache)
            """)
        # Update existing records.
        self.cur.execute("""
            UPDATE KarmaTotalCache SET karma_total=sum_karmavalue
            FROM (
                SELECT person AS sum_person, SUM(karmavalue) AS sum_karmavalue
                FROM KarmaCache
                GROUP BY person
                ) AS sums
            WHERE KarmaTotalCache.person = sum_person
            """)

        # VACUUM KarmaTotalCache since we have just touched every row in it.
        self.cur.execute("""VACUUM KarmaTotalCache""")

        # Insert new records into the KarmaTotalCache table.

        # XXX: salgado 2007-02-06:
        # If deadlocks ever become a problem, first LOCK the
        # corresponding rows in the Person table so the bulk insert cannot
        # fail. We don't bother at the moment as this would involve granting
        # UPDATE rights on the Person table to the karmacacheupdater user.
        ## cur.execute("BEGIN")
        ## cur.execute("""
        ##     SELECT * FROM Person
        ##     WHERE id NOT IN (SELECT person FROM KarmaTotalCache)
        ##     FOR UPDATE
        ##     """)

        self.cur.execute("""
            INSERT INTO KarmaTotalCache (person, karma_total)
            SELECT person, SUM(karmavalue) FROM KarmaCache
            WHERE person NOT IN (SELECT person FROM KarmaTotalCache)
            GROUP BY person
            """)

        ## self.cur.execute("COMMIT")

    def C_add_karmacache_sums(self):
        self.logger.info("Step C: Calculating KarmaCache sums")
        # We must issue some SUM queries to insert the karma totals for:
        # - All actions of a person on a given product.
        # - All actions of a person on a given distribution.
        # - All actions of a person on a given project.
        # - All actions with a specific category of a person on a given
        #   project.
        # - All actions with a specific category of a person.

        # - All actions with a specific category of a person.
        self.cur.execute("""
            INSERT INTO KarmaCache
                (person, category, karmavalue, product, distribution,
                 sourcepackagename, project)
            SELECT person, category, SUM(karmavalue), NULL, NULL, NULL, NULL
            FROM KarmaCache
            WHERE category IS NOT NULL
            GROUP BY person, category
            """)

        # - All actions of a person on a given product.
        self.cur.execute("""
            INSERT INTO KarmaCache
                (person, category, karmavalue, product, distribution,
                 sourcepackagename, project)
            SELECT person, NULL, SUM(karmavalue), product, NULL, NULL, NULL
            FROM KarmaCache
            WHERE product IS NOT NULL
            GROUP BY person, product
            """)

        # - All actions of a person on a given distribution.
        self.cur.execute("""
            INSERT INTO KarmaCache
                (person, category, karmavalue, product, distribution,
                 sourcepackagename, project)
            SELECT person, NULL, SUM(karmavalue), NULL, distribution, NULL, NULL
            FROM KarmaCache
            WHERE distribution IS NOT NULL
            GROUP BY person, distribution
            """)

        # - All actions of a person on a given project.
        self.cur.execute("""
            INSERT INTO KarmaCache
                (person, category, karmavalue, product, distribution,
                 sourcepackagename, project)
            SELECT person, NULL, SUM(karmavalue), NULL, NULL, NULL,
                   Product.project
            FROM KarmaCache
            JOIN Product ON product = Product.id
            WHERE Product.project IS NOT NULL AND product IS NOT NULL
                  AND category IS NOT NULL
            GROUP BY person, Product.project
            """)

        # - All actions with a specific category of a person on a given project
        # IMPORTANT: This has to be the latest step; otherwise the rows
        # inserted here will be included in the calculation of the overall
        # karma of a person on a given project.
        self.cur.execute("""
            INSERT INTO KarmaCache
                (person, category, karmavalue, product, distribution,
                 sourcepackagename, project)
            SELECT person, category, SUM(karmavalue), NULL, NULL, NULL,
                   Product.project
            FROM KarmaCache
            JOIN Product ON product = Product.id
            WHERE Product.project IS NOT NULL AND product IS NOT NULL
                  AND category IS NOT NULL
            GROUP BY person, category, Product.project
            """)

    def calculate_scaling(self, results):
        """Return a dict of scaling factors keyed on category ID"""

        # Get a list of categories, which we will need shortly.
        categories = {}
        self.cur.execute("SELECT id, name from KarmaCategory")
        for id, name in self.cur.fetchall():
            categories[id] = name

        # Calculate normalization factor for each category. We currently have
        # category bloat, where translators dominate the top karma rankings.
        # By calculating a scaling factor automatically, this slant will be
        # removed even as more events are added or scoring tweaked.
        points_per_category = {}
        for dummy, category, dummy, dummy, points in results:
            if category not in points_per_category:
                points_per_category[category] = 0
            points_per_category[category] += points
        largest_total = max(points_per_category.values())

        scaling = {}
        for category, points in points_per_category.items():
            if points == 0:
                scaling[category] = 1
            else:
                scaling[category] = float(largest_total) / float(points)
            max_scaling = config.karmacacheupdater.max_scaling
            if scaling[category] > max_scaling:
                self.logger.info(
                    'Scaling %s by a factor of %0.4f (capped to %0.4f)'
                    % (categories[category], scaling[category], max_scaling))
                scaling[category] = max_scaling
            else:
                self.logger.info(
                    'Scaling %s by a factor of %0.4f'
                    % (categories[category], scaling[category]))
        return scaling

    def update_one_karma_cache_entry(self, entry, scaling):
        """Updates an individual (non-summed) KarmaCache entry.

        KarmaCache has individual entries, and then it has the summed entries
        that correspond to overall contributions across all categories. Look
        at C_add_summed_totals to see how the summed entries are generated.
        """
        (person_id, category_id, product_id, distribution_id, points) = entry
        points *= scaling[category_id] # Scaled. wow.
        self.logger.debug("Setting person_id=%d, category_id=%d, points=%d"
                          % (person_id, category_id, points))

        points = int(points)
        context = {'product_id': product_id,
                   'distribution_id': distribution_id}

        try:
            self.karmacachemanager.updateKarmaValue(
                points, person_id, category_id, **context)
            self.logger.debug(
                "Updated karmacache for person=%s, points=%s, category=%s, "
                "context=%s" % (person_id, points, category_id, context))
        except NotFoundError:
            # Row didn't exist; do an insert.
            self.karmacachemanager.new(
                points, person_id, category_id, **context)
            self.logger.debug(
                "Created karmacache for person=%s, points=%s, category=%s, "
                "context=%s" % (person_id, points, category_id, context))


if __name__ == '__main__':
    script = KarmaCacheUpdater('karma-update',
        dbuser=config.karmacacheupdater.dbuser)
    script.lock_and_run(implicit_begin=True)