~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

# pylint: disable-msg=E0611,W0212

__metaclass__ = type

__all__ = [
    'BuildQueue',
    'BuildQueueSet',
    'specific_job_classes',
    ]

from collections import defaultdict
from datetime import (
    datetime,
    timedelta,
    )
from itertools import groupby
import logging
from operator import attrgetter

import pytz
from sqlobject import (
    BoolCol,
    ForeignKey,
    IntCol,
    IntervalCol,
    SQLObjectNotFound,
    StringCol,
    )
from zope.component import (
    getSiteManager,
    getUtility,
    )
from zope.interface import implements

from lp.app.errors import NotFoundError
from lp.buildmaster.enums import BuildFarmJobType
from lp.buildmaster.interfaces.buildfarmjob import IBuildFarmJob
from lp.buildmaster.interfaces.buildfarmjobbehavior import (
    IBuildFarmJobBehavior,
    )
from lp.buildmaster.interfaces.buildqueue import (
    IBuildQueue,
    IBuildQueueSet,
    )
from lp.services.database.constants import DEFAULT
from lp.services.database.enumcol import EnumCol
from lp.services.database.sqlbase import (
    SQLBase,
    sqlvalues,
    )
from lp.services.job.interfaces.job import JobStatus
from lp.services.job.model.job import Job
from lp.services.propertycache import (
    cachedproperty,
    get_property_cache,
    )
from lp.services.webapp.interfaces import (
    DEFAULT_FLAVOR,
    IStoreSelector,
    MAIN_STORE,
    )


def normalize_virtualization(virtualized):
    """Jobs with NULL virtualization settings should be treated the
       same way as virtualized jobs."""
    return virtualized is None or virtualized


def specific_job_classes():
    """Job classes that may run on the build farm."""
    job_classes = dict()
    # Get all components that implement the `IBuildFarmJob` interface.
    components = getSiteManager()
    implementations = sorted(components.getUtilitiesFor(IBuildFarmJob))
    # The above yields a collection of 2-tuples where the first element
    # is the name of the `BuildFarmJobType` enum and the second element
    # is the implementing class respectively.
    for job_enum_name, job_class in implementations:
        job_enum = getattr(BuildFarmJobType, job_enum_name)
        job_classes[job_enum] = job_class

    return job_classes


def get_builder_data():
    """How many working builders are there, how are they configured?"""
    store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
    builder_data = """
        SELECT processor, virtualized, COUNT(id) FROM builder
        WHERE builderok = TRUE AND manual = FALSE
        GROUP BY processor, virtualized;
    """
    results = store.execute(builder_data).get_all()
    builders_in_total = virtualized_total = 0

    builder_stats = defaultdict(int)
    for processor, virtualized, count in results:
        builders_in_total += count
        if virtualized:
            virtualized_total += count
        builder_stats[(processor, virtualized)] = count

    builder_stats[(None, True)] = virtualized_total
    # Jobs with a NULL virtualized flag should be treated the same as
    # jobs where virtualized=TRUE.
    builder_stats[(None, None)] = virtualized_total
    builder_stats[(None, False)] = builders_in_total - virtualized_total
    return builder_stats


class BuildQueue(SQLBase):
    implements(IBuildQueue)
    _table = "BuildQueue"
    _defaultOrder = "id"

    def __init__(self, job, job_type=DEFAULT,  estimated_duration=DEFAULT,
                 virtualized=DEFAULT, processor=DEFAULT, lastscore=None):
        super(BuildQueue, self).__init__(job_type=job_type, job=job,
            virtualized=virtualized, processor=processor,
            estimated_duration=estimated_duration, lastscore=lastscore)
        if lastscore is None and self.specific_job is not None:
            self.score()

    job = ForeignKey(dbName='job', foreignKey='Job', notNull=True)
    job_type = EnumCol(
        enum=BuildFarmJobType, notNull=True,
        default=BuildFarmJobType.PACKAGEBUILD, dbName='job_type')
    builder = ForeignKey(dbName='builder', foreignKey='Builder', default=None)
    logtail = StringCol(dbName='logtail', default=None)
    lastscore = IntCol(dbName='lastscore', default=0)
    manual = BoolCol(dbName='manual', default=False)
    estimated_duration = IntervalCol()
    processor = ForeignKey(dbName='processor', foreignKey='Processor')
    virtualized = BoolCol(dbName='virtualized')

    @property
    def required_build_behavior(self):
        """See `IBuildQueue`."""
        return IBuildFarmJobBehavior(self.specific_job)

    @cachedproperty
    def specific_job(self):
        """See `IBuildQueue`."""
        specific_class = specific_job_classes()[self.job_type]
        return specific_class.getByJob(self.job)

    def _clear_specific_job_cache(self):
        del get_property_cache(self).specific_job

    @staticmethod
    def preloadSpecificJobData(queues):
        key = attrgetter('job_type')
        for job_type, grouped_queues in groupby(queues, key=key):
            specific_class = specific_job_classes()[job_type]
            queue_subset = list(grouped_queues)
            # We need to preload the build farm jobs early to avoid
            # the call to _set_build_farm_job to look up BuildFarmBuildJobs
            # one by one.
            specific_class.preloadBuildFarmJobs(queue_subset)
            specific_jobs = specific_class.getByJobs(queue_subset)
            if len(list(specific_jobs)) == 0:
                continue
            specific_class.preloadJobsData(specific_jobs)
            specific_jobs_dict = dict(
                (specific_job.job, specific_job)
                    for specific_job in specific_jobs)
            for queue in queue_subset:
                cache = get_property_cache(queue)
                cache.specific_job = specific_jobs_dict[queue.job]

    @property
    def date_started(self):
        """See `IBuildQueue`."""
        return self.job.date_started

    @property
    def current_build_duration(self):
        """See `IBuildQueue`."""
        date_started = self.date_started
        if date_started is None:
            return None
        else:
            return self._now() - date_started

    def destroySelf(self):
        """Remove this record and associated job/specific_job."""
        job = self.job
        specific_job = self.specific_job
        SQLBase.destroySelf(self)
        specific_job.cleanUp()
        job.destroySelf()
        self._clear_specific_job_cache()

    def manualScore(self, value):
        """See `IBuildQueue`."""
        self.lastscore = value
        self.manual = True

    def score(self):
        """See `IBuildQueue`."""
        # Grab any logger instance available.
        logger = logging.getLogger()
        name = self.specific_job.getName()

        if self.manual:
            logger.debug(
                "%s (%d) MANUALLY RESCORED" % (name, self.lastscore))
            return

        # Allow the `IBuildFarmJob` instance with the data/logic specific to
        # the job at hand to calculate the score as appropriate.
        self.lastscore = self.specific_job.score()

    def getLogFileName(self):
        """See `IBuildQueue`."""
        # Allow the `IBuildFarmJob` instance with the data/logic specific to
        # the job at hand to calculate the log file name as appropriate.
        return self.specific_job.getLogFileName()

    def markAsBuilding(self, builder):
        """See `IBuildQueue`."""
        self.builder = builder
        if self.job.status != JobStatus.RUNNING:
            self.job.start()
        self.specific_job.jobStarted()

    def reset(self):
        """See `IBuildQueue`."""
        self.builder = None
        if self.job.status != JobStatus.WAITING:
            self.job.queue()
        self.job.date_started = None
        self.job.date_finished = None
        self.logtail = None
        self.specific_job.jobReset()

    def cancel(self):
        """See `IBuildQueue`."""
        self.specific_job.jobCancel()
        self.destroySelf()

    def setDateStarted(self, timestamp):
        """See `IBuildQueue`."""
        self.job.date_started = timestamp

    def _getFreeBuildersCount(self, processor, virtualized):
        """How many builders capable of running jobs for the given processor
        and virtualization combination are idle/free at present?"""
        query = """
            SELECT COUNT(id) FROM builder
            WHERE
                builderok = TRUE AND manual = FALSE
                AND id NOT IN (
                    SELECT builder FROM BuildQueue WHERE builder IS NOT NULL)
                AND virtualized = %s
            """ % sqlvalues(normalize_virtualization(virtualized))
        if processor is not None:
            query += """
                AND processor = %s
            """ % sqlvalues(processor)
        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
        result_set = store.execute(query)
        free_builders = result_set.get_one()[0]
        return free_builders

    def _estimateTimeToNextBuilder(self):
        """Estimate time until next builder becomes available.

        For the purpose of estimating the dispatch time of the job of interest
        (JOI) we need to know how long it will take until the job at the head
        of JOI's queue is dispatched.

        There are two cases to consider here: the head job is

            - processor dependent: only builders with the matching
              processor/virtualization combination should be considered.
            - *not* processor dependent: all builders with the matching
              virtualization setting should be considered.

        :return: The estimated number of seconds untils a builder capable of
            running the head job becomes available.
        """
        head_job_platform = self._getHeadJobPlatform()

        # Return a zero delay if we still have free builders available for the
        # given platform/virtualization combination.
        free_builders = self._getFreeBuildersCount(*head_job_platform)
        if free_builders > 0:
            return 0

        head_job_processor, head_job_virtualized = head_job_platform

        now = self._now()
        delay_query = """
            SELECT MIN(
              CASE WHEN
                EXTRACT(EPOCH FROM
                  (BuildQueue.estimated_duration -
                   (((%s AT TIME ZONE 'UTC') - Job.date_started))))  >= 0
              THEN
                EXTRACT(EPOCH FROM
                  (BuildQueue.estimated_duration -
                   (((%s AT TIME ZONE 'UTC') - Job.date_started))))
              ELSE
                -- Assume that jobs that have overdrawn their estimated
                -- duration time budget will complete within 2 minutes.
                -- This is a wild guess but has worked well so far.
                --
                -- Please note that this is entirely innocuous i.e. if our
                -- guess is off nothing bad will happen but our estimate will
                -- not be as good as it could be.
                120
              END)
            FROM
                BuildQueue, Job, Builder
            WHERE
                BuildQueue.job = Job.id
                AND BuildQueue.builder = Builder.id
                AND Builder.manual = False
                AND Builder.builderok = True
                AND Job.status = %s
                AND Builder.virtualized = %s
            """ % sqlvalues(
                now, now, JobStatus.RUNNING,
                normalize_virtualization(head_job_virtualized))

        if head_job_processor is not None:
            # Only look at builders with specific processor types.
            delay_query += """
                AND Builder.processor = %s
                """ % sqlvalues(head_job_processor)

        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
        result_set = store.execute(delay_query)
        head_job_delay = result_set.get_one()[0]
        return (0 if head_job_delay is None else int(head_job_delay))

    def _getPendingJobsClauses(self):
        """WHERE clauses for pending job queries, used for dipatch time
        estimation."""
        virtualized = normalize_virtualization(self.virtualized)
        clauses = """
            BuildQueue.job = Job.id
            AND Job.status = %s
            AND (
                -- The score must be either above my score or the
                -- job must be older than me in cases where the
                -- score is equal.
                BuildQueue.lastscore > %s OR
                (BuildQueue.lastscore = %s AND Job.id < %s))
            -- The virtualized values either match or the job
            -- does not care about virtualization and the job
            -- of interest (JOI) is to be run on a virtual builder
            -- (we want to prevent the execution of untrusted code
            -- on native builders).
            AND COALESCE(buildqueue.virtualized, TRUE) = %s
            """ % sqlvalues(
                JobStatus.WAITING, self.lastscore, self.lastscore, self.job,
                virtualized)
        processor_clause = """
            AND (
                -- The processor values either match or the candidate
                -- job is processor-independent.
                buildqueue.processor = %s OR
                buildqueue.processor IS NULL)
            """ % sqlvalues(self.processor)
        # We don't care about processors if the estimation is for a
        # processor-independent job.
        if self.processor is not None:
            clauses += processor_clause
        return clauses

    def _getHeadJobPlatform(self):
        """Find the processor and virtualization setting for the head job.

        Among the jobs that compete with the job of interest (JOI) for
        builders and are queued ahead of it the head job is the one in pole
        position i.e. the one to be dispatched to a builder next.

        :return: A (processor, virtualized) tuple which is the head job's
        platform or None if the JOI is the head job.
        """
        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
        my_platform = (
            getattr(self.processor, 'id', None),
            normalize_virtualization(self.virtualized))
        query = """
            SELECT
                processor,
                virtualized
            FROM
                BuildQueue, Job
            WHERE
            """
        query += self._getPendingJobsClauses()
        query += """
            ORDER BY lastscore DESC, job LIMIT 1
            """
        result = store.execute(query).get_one()
        return (my_platform if result is None else result)

    def _estimateJobDelay(self, builder_stats):
        """Sum of estimated durations for *pending* jobs ahead in queue.

        For the purpose of estimating the dispatch time of the job of
        interest (JOI) we need to know the delay caused by all the pending
        jobs that are ahead of the JOI in the queue and that compete with it
        for builders.

        :param builder_stats: A dictionary with builder counts where the
            key is a (processor, virtualized) combination (aka "platform") and
            the value is the number of builders that can take on jobs
            requiring that combination.
        :return: An integer value holding the sum of delays (in seconds)
            caused by the jobs that are ahead of and competing with the JOI.
        """
        def jobs_compete_for_builders(a, b):
            """True if the two jobs compete for builders."""
            a_processor, a_virtualized = a
            b_processor, b_virtualized = b
            if a_processor is None or b_processor is None:
                # If either of the jobs is platform-independent then the two
                # jobs compete for the same builders if the virtualization
                # settings match.
                if a_virtualized == b_virtualized:
                    return True
            else:
                # Neither job is platform-independent, match processor and
                # virtualization settings.
                return a == b

        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
        my_platform = (
            getattr(self.processor, 'id', None),
            normalize_virtualization(self.virtualized))
        query = """
            SELECT
                BuildQueue.processor,
                BuildQueue.virtualized,
                COUNT(BuildQueue.job),
                CAST(EXTRACT(
                    EPOCH FROM
                        SUM(BuildQueue.estimated_duration)) AS INTEGER)
            FROM
                BuildQueue, Job
            WHERE
            """
        query += self._getPendingJobsClauses()
        query += """
            GROUP BY BuildQueue.processor, BuildQueue.virtualized
            """

        delays_by_platform = store.execute(query).get_all()

        # This will be used to capture per-platform delay totals.
        delays = defaultdict(int)
        # This will be used to capture per-platform job counts.
        job_counts = defaultdict(int)

        # Divide the estimated duration of the jobs as follows:
        #   - if a job is tied to a processor TP then divide the estimated
        #     duration of that job by the number of builders that target TP
        #     since only these can build the job.
        #   - if the job is processor-independent then divide its estimated
        #     duration by the total number of builders with the same
        #     virtualization setting because any one of them may run it.
        for processor, virtualized, job_count, delay in delays_by_platform:
            virtualized = normalize_virtualization(virtualized)
            platform = (processor, virtualized)
            builder_count = builder_stats.get(platform, 0)
            if builder_count == 0:
                # There is no builder that can run this job, ignore it
                # for the purpose of dispatch time estimation.
                continue

            if jobs_compete_for_builders(my_platform, platform):
                # The jobs that target the platform at hand compete with
                # the JOI for builders, add their delays.
                delays[platform] += delay
                job_counts[platform] += job_count

        sum_of_delays = 0
        # Now devide the delays based on a jobs/builders comparison.
        for platform, duration in delays.iteritems():
            jobs = job_counts[platform]
            builders = builder_stats[platform]
            # If there are less jobs than builders that can take them on,
            # the delays should be averaged/divided by the number of jobs.
            denominator = (jobs if jobs < builders else builders)
            if denominator > 1:
                duration = int(duration / float(denominator))

            sum_of_delays += duration

        return sum_of_delays

    def getEstimatedJobStartTime(self):
        """See `IBuildQueue`.

        The estimated dispatch time for the build farm job at hand is
        calculated from the following ingredients:
            * the start time for the head job (job at the
              head of the respective build queue)
            * the estimated build durations of all jobs that
              precede the job of interest (JOI) in the build queue
              (divided by the number of machines in the respective
              build pool)
        """
        # This method may only be invoked for pending jobs.
        if self.job.status != JobStatus.WAITING:
            raise AssertionError(
                "The start time is only estimated for pending jobs.")

        builder_stats = get_builder_data()
        platform = (getattr(self.processor, 'id', None), self.virtualized)
        if builder_stats[platform] == 0:
            # No builders that can run the job at hand
            #   -> no dispatch time estimation available.
            return None

        # Get the sum of the estimated run times for *pending* jobs that are
        # ahead of us in the queue.
        sum_of_delays = self._estimateJobDelay(builder_stats)

        # Get the minimum time duration until the next builder becomes
        # available.
        min_wait_time = self._estimateTimeToNextBuilder()

        # A job will not get dispatched in less than 5 seconds no matter what.
        start_time = max(5, min_wait_time + sum_of_delays)
        result = self._now() + timedelta(seconds=start_time)

        return result

    @staticmethod
    def _now():
        """Return current time (UTC).  Overridable for test purposes."""
        return datetime.now(pytz.UTC)


class BuildQueueSet(object):
    """Utility to deal with BuildQueue content class."""
    implements(IBuildQueueSet)

    def __init__(self):
        self.title = "The Launchpad build queue"

    def __iter__(self):
        """See `IBuildQueueSet`."""
        return iter(BuildQueue.select())

    def __getitem__(self, buildqueue_id):
        """See `IBuildQueueSet`."""
        try:
            return BuildQueue.get(buildqueue_id)
        except SQLObjectNotFound:
            raise NotFoundError(buildqueue_id)

    def get(self, buildqueue_id):
        """See `IBuildQueueSet`."""
        return BuildQueue.get(buildqueue_id)

    def getByJob(self, job):
        """See `IBuildQueueSet`."""
        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
        return store.find(BuildQueue, BuildQueue.job == job).one()

    def count(self):
        """See `IBuildQueueSet`."""
        return BuildQueue.select().count()

    def getByBuilder(self, builder):
        """See `IBuildQueueSet`."""
        return BuildQueue.selectOneBy(builder=builder)

    def getActiveBuildJobs(self):
        """See `IBuildQueueSet`."""
        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
        result_set = store.find(
            BuildQueue,
            BuildQueue.job == Job.id,
            # XXX Michael Nelson 2010-02-22 bug=499421
            # Avoid corrupt build jobs where the builder is None.
            BuildQueue.builder != None,
            # status is a property. Let's use _status.
            Job._status == JobStatus.RUNNING,
            Job.date_started != None)
        return result_set