~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# Copyright 2009 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""A set of functions related to the ability to parse the XML CVE database,
extract details of known CVE entries, and ensure that all of the known
CVE's are fully registered in Launchpad."""

__metaclass__ = type

import gzip
import StringIO
import time
import urllib2
import xml.etree.cElementTree as cElementTree

from zope.component import getUtility
from zope.event import notify
from zope.interface import implements
from zope.lifecycleevent import ObjectModifiedEvent

from lp.bugs.interfaces.cve import (
    CveStatus,
    ICveSet,
    )
from lp.services.config import config
from lp.services.looptuner import (
    ITunableLoop,
    LoopTuner,
    )
from lp.services.scripts.base import (
    LaunchpadCronScript,
    LaunchpadScriptFailure,
    )


CVEDB_NS = '{http://cve.mitre.org/cve/downloads}'


def getText(elem):
    """Get the text content of the given element"""
    text = elem.text or ""
    for e in elem:
        text += getText(e)
        if e.tail:
            text += e.tail
    return text.strip()


def handle_references(cve_node, cve, log):
    """Handle the references on the given CVE xml DOM.

    This function is passed an XML dom representing a CVE, and a CVE
    database object. It looks for Refs in the XML data structure and ensures
    that those are correctly represented in the database.

    It will try to find a relevant reference, and if so, update it. If
    not, it will create a new reference.  Finally, it removes any old
    references that it no longer sees in the official CVE database.
    It will return True or False, indicating whether or not the cve was
    modified in the process.
    """
    modified = False
    # we make a copy of the references list because we will be removing
    # items from it, to see what's left over
    old_references = set(cve.references)
    new_references = set()

    # work through the refs in the xml dump
    for ref_node in cve_node.findall('.//%sref' % CVEDB_NS):
        refsrc = ref_node.get("source")
        refurl = ref_node.get("url")
        reftxt = getText(ref_node)
        # compare it to each of the known references
        was_there_previously = False
        for ref in old_references:
            if ref.source == refsrc and ref.url == refurl and \
               ref.content == reftxt:
                # we have found a match, remove it from the old list
                was_there_previously = True
                new_references.add(ref)
                break
        if not was_there_previously:
            log.info("Creating new %s reference for %s" % (refsrc,
                cve.sequence))
            ref = cve.createReference(refsrc, reftxt, url=refurl)
            new_references.add(ref)
            modified = True
    # now, if there are any refs in old_references that are not in
    # new_references, then we need to get rid of them
    for ref in sorted(old_references,
        key=lambda a: (a.source, a.content, a.url)):
        if ref not in new_references:
            log.info("Removing %s reference for %s" % (ref.source,
                cve.sequence))
            cve.removeReference(ref)
            modified = True

    return modified


def update_one_cve(cve_node, log):
    """Update the state of a single CVE item."""
    # get the sequence number
    sequence = cve_node.get('seq')
    # establish its status
    status = cve_node.get('type')
    # get the description
    description = getText(cve_node.find(CVEDB_NS + 'desc'))
    if not description:
        log.debug('No description for CVE-%s' % sequence)
    if status == 'CAN':
        new_status = CveStatus.CANDIDATE
    elif status == 'CVE':
        new_status = CveStatus.ENTRY
    else:
        log.error('Unknown status %s for CVE-%s' % (status, sequence))
        return
    # find or create the CVE entry in the db
    cveset = getUtility(ICveSet)
    cve = cveset[sequence]
    if cve is None:
        cve = cveset.new(sequence, description, new_status)
        log.info('CVE-%s created' % sequence)
    # update the CVE if needed
    modified = False
    if cve.status != new_status:
        log.info('CVE-%s changed from %s to %s' % (cve.sequence,
            cve.status.title, new_status.title))
        cve.status = new_status
        modified = True
    if cve.description != description:
        log.info('CVE-%s updated description' % cve.sequence)
        cve.description = description
        modified = True
    # make sure we have copies of all the references.
    if handle_references(cve_node, cve, log):
        modified = True
    # trigger an event if modified
    if modified:
        notify(ObjectModifiedEvent(cve))
    return


class CveUpdaterTunableLoop(object):
    """An `ITunableLoop` for updating CVEs."""

    implements(ITunableLoop)

    total_updated = 0

    def __init__(self, cves, transaction, logger, offset=0):
        self.cves = cves
        self.transaction = transaction
        self.logger = logger
        self.offset = offset
        self.total_updated = 0

    def isDone(self):
        """See `ITunableLoop`."""
        return self.offset is None

    def __call__(self, chunk_size):
        """Retrieve a batch of CVEs and update them.

        See `ITunableLoop`.
        """
        chunk_size = int(chunk_size)

        self.logger.debug("More %d" % chunk_size)

        start = self.offset
        end = self.offset + chunk_size

        self.transaction.begin()

        cve_batch = self.cves[start:end]
        self.offset = None
        for cve in cve_batch:
            start += 1
            self.offset = start
            update_one_cve(cve, self.logger)
            self.total_updated += 1

        self.logger.debug("Committing.")
        self.transaction.commit()


class CVEUpdater(LaunchpadCronScript):

    def add_my_options(self):
        """Parse command line arguments."""
        self.parser.add_option(
            "-f", "--cvefile", dest="cvefile", default=None,
            help="An XML file containing the CVE database.")
        self.parser.add_option(
            "-u", "--cveurl", dest="cveurl",
            default=config.cveupdater.cve_db_url,
            help="The URL for the gzipped XML CVE database.")

    def main(self):
        self.logger.info('Initializing...')
        if self.options.cvefile is not None:
            try:
                cve_db = open(self.options.cvefile, 'r').read()
            except IOError:
                raise LaunchpadScriptFailure(
                    'Unable to open CVE database in %s'
                    % self.options.cvefile)

        elif self.options.cveurl is not None:
            self.logger.info("Downloading CVE database from %s..." %
                             self.options.cveurl)
            try:
                url = urllib2.urlopen(self.options.cveurl)
            except (urllib2.HTTPError, urllib2.URLError):
                raise LaunchpadScriptFailure(
                    'Unable to connect for CVE database %s'
                    % self.options.cveurl)

            cve_db_gz = url.read()
            self.logger.info("%d bytes downloaded." % len(cve_db_gz))
            cve_db = gzip.GzipFile(
                fileobj=StringIO.StringIO(cve_db_gz)).read()
        else:
            raise LaunchpadScriptFailure('No CVE database file or URL given.')

        # Start analysing the data.
        start_time = time.time()
        self.logger.info("Processing CVE XML...")
        self.processCVEXML(cve_db)
        finish_time = time.time()
        self.logger.info('%d seconds to update database.'
                % (finish_time - start_time))

    def processCVEXML(self, cve_xml):
        """Process the CVE XML file.

        :param cve_xml: The CVE XML as a string.
        """
        dom = cElementTree.fromstring(cve_xml)
        items = dom.findall(CVEDB_NS + 'item')
        if len(items) == 0:
            raise LaunchpadScriptFailure("No CVEs found in XML file.")
        self.logger.info("Updating database...")

        # We use Looptuner to control the ideal number of CVEs
        # processed in each transaction, during at least 2 seconds.
        loop = CveUpdaterTunableLoop(items, self.txn, self.logger)
        loop_tuner = LoopTuner(loop, 2)
        loop_tuner.run()