~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
#!/usr/bin/python -S
#
# Copyright 2009 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

# pylint: disable-msg=W0403
"""Remove all translations from upstream.

This script is useful to recover from breakages after importing bad
.po files like the one reported at #32610.
"""

import _pythonpath

import sys
import logging
from optparse import OptionParser

from zope.component import getUtility

from canonical.config import config
from canonical.database.constants import UTC_NOW
from canonical.lp import initZopeless
from canonical.launchpad.scripts import (
    execute_zcml_for_scripts, logger, logger_options)
from canonical.launchpad.interfaces import ILaunchpadCelebrities
from lp.registry.interfaces.product import IProductSet
from lp.registry.interfaces.distribution import IDistributionSet
from lp.registry.interfaces.distroseries import IDistroSeriesSet
from lp.registry.interfaces.sourcepackagename import (
    ISourcePackageNameSet)
from lp.translations.interfaces.potemplate import IPOTemplateSet
from lp.translations.interfaces.translationmessage import (
    RosettaTranslationOrigin)



logger_name = 'remove-upstream-translations'

def parse_options(args):
    """Parse a set of command line options.

    Return an optparse.Values object.
    """
    parser = OptionParser()

    parser.add_option("-p", "--product", dest="product",
        help="The product where we should look for translations.")
    parser.add_option("-s", "--series", dest="series",
        help="The product series where we should look for translations.")
    parser.add_option("-d", "--distro", dest="distro",
        help="The distribution where we should look for translations.")
    parser.add_option("-r", "--distroseries", dest="distroseries",
        help="The distribution series where we should look for translations."
        )
    parser.add_option("-n", "--sourcepackagename", dest="sourcepackagename",
        help="The distribution where we should look for translations.")
    parser.add_option("-t", "--potemplatename", dest="potemplatename",
        help="The PO Template name where we should look for translations.")
    parser.add_option("-l", "--language-code", dest="languagecode",
        help="The language code where we should look for translations.")

    # Add the verbose/quiet options.
    logger_options(parser)

    (options, args) = parser.parse_args(args)

    return options

def remove_upstream_entries(ztm, potemplates, lang_code=None, variant=None):
    """Remove all translations that came from upstream.

    :arg ztm: Zope transaction manager.
    :arg potemplates: A set of potemplates that we should process.
    :arg lang_code: A string with a language code where we should do the
        removal.
    :arg variant: A language variant that we should use with the lang_code to
        locate the translations to remove.

    If lang_code is None, we process all available languages.
    """
    assert ((lang_code is None and variant is None) or
            (lang_code is not None)), (
                'variant cannot be != None if lang_code is None')

    logger_object = logging.getLogger(logger_name)

    items_deleted = 0
    # All changes should be logged as done by Rosetta Expert team.
    rosetta_experts = getUtility(ILaunchpadCelebrities).rosetta_experts

    for potemplate in potemplates:
        if lang_code is None:
            pofiles = sorted(
                list(potemplate.pofiles),
                key=lambda p: (p.language.code, p.variant))
        else:
            pofile = potemplate.getPOFileByLang(lang_code, variant)
            if pofile is None:
                pofiles = []
            else:
                pofiles = [pofile]

        for pofile in pofiles:
            logger_object.debug('Processing %s...' % pofile.title)
            pofile_items_deleted = 0
            for message in pofile.translation_messages:
                active_changed = False
                if message.origin == RosettaTranslationOrigin.SCM:
                    if message.is_current:
                        active_changed = True
                    message.destroySelf()
                    pofile_items_deleted += 1
                if active_changed:
                    message.pofile.date_changed = UTC_NOW
                    message.pofile.lasttranslator = rosetta_experts
                    message.reviewer = rosetta_experts
                    message.date_reviewed = UTC_NOW

            items_deleted += pofile_items_deleted
            logger_object.debug(
                 'Removed %d submissions' % pofile_items_deleted)
            pofile.updateStatistics()
            ztm.commit()

    # We finished the removal process, is time to notify the amount of entries
    # that we removed.
    logger_object.debug(
        'Removed %d submissions in total.' % items_deleted)


def main(argv):
    options = parse_options(argv[1:])
    logger_object = logger(options, logger_name)

    execute_zcml_for_scripts()
    ztm = initZopeless(dbuser=config.rosetta.admin_dbuser)

    product = None
    series = None
    distro = None
    distroseries = None
    sourcepackagename = None
    potemplatename = None
    language_code = None
    if options.product is not None:
        productset = getUtility(IProductSet)
        product = productset.getByName(options.product)
        if product is None:
            logger_object.error(
                'The %s product does not exist.' % options.product)
            return 1

    if options.series is not None:
        if product is None:
            logger_object.error(
                'You need to specify a product if you want to select a'
                ' productseries.')
            return 1

        series = product.getSeries(options.series)
        if series is None:
            logger_object.error(
                'The %s series does not exist inside %s product.' % (
                    options.series, options.product))
            return 1

    if options.distro is not None:
        if product is not None:
            logger_object.error(
                'You cannot mix distributions and products.')
            return 1
        distroset = getUtility(IDistributionSet)
        distro = distroset.getByName(options.distro)
        if distro is None:
            logger_object.error(
                'The %s distribution does not exist.' % options.distro)
            return 1

    if options.distroseries is not None:
        if distro is None:
            logger_object.error(
                'You need to specify a distribution if you want to select a'
                ' sourcepackagename.')
        distroseriesset = getUtility(IDistroSeriesSet)
        distroseries = distroseriesset.queryByName(
            distro, options.distroseries)
        if distroseries is None:
            logger_object.error(
                'The %s distribution does not exist.' % options.distroseries)
            return 1

    if options.sourcepackagename is not None:
        if distroseries is None:
            logger_object.error(
                'You need to specify a distribution series if you want to'
                ' select a sourcepackagename.')
            return 1
        sourcepackagenameset = getUtility(ISourcePackageNameSet)
        sourcepackagename = sourcepackagenameset.queryByName(
            options.sourcepackagename)
        if sourcepackagename is None:
            logger_object.error(
                'The %s sourcepackagename does not exist.' % (
                    options.sourcepackagename))
            return 1

    potemplateset = getUtility(IPOTemplateSet)
    if series is None and distroseries is None:
        if options.potemplatename is None:
            logger_object.warning('Nothing to do. Exiting...')
            return 0
        else:
            potemplates = potemplateset.getAllByName(
                options.potemplatename)
    else:
        potemplate_subset = potemplateset.getSubset(
            distroseries=distroseries, sourcepackagename=sourcepackagename,
            productseries=series)
        if options.potemplatename is not None:
            potemplate = potemplate_subset.getPOTemplateByName(
                options.potemplatename)
            potemplates = [potemplate]
        else:
            # Get a list from the subset of potemplates to be able to do
            # transaction commits.
            potemplates = list(potemplate_subset)

    lang_code = None
    variant = None
    if options.languagecode is not None:
        if '@' in options.languagecode:
            lang_code, variant = options.languagecode.split('@')
        else:
            lang_code = options.languagecode

    remove_upstream_entries(ztm, potemplates, lang_code, variant)

if __name__ == '__main__':
    sys.exit(main(sys.argv))