~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
# Copyright 2009 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Tests for job-running facilities."""

import logging
import sys
from textwrap import dedent
from time import sleep

from testtools.matchers import MatchesRegex
from testtools.testcase import ExpectedException
import transaction
from zope.interface import implements

from canonical.config import config
from canonical.launchpad.webapp import errorlog
from canonical.testing.layers import LaunchpadZopelessLayer
from lp.code.interfaces.branchmergeproposal import IUpdatePreviewDiffJobSource
from lp.services.job.interfaces.job import (
    IRunnableJob,
    JobStatus,
    LeaseHeld,
    SuspendJobException,
    )
from lp.services.job.model.job import Job
from lp.services.job.runner import (
    BaseRunnableJob,
    JobCronScript,
    JobRunner,
    TwistedJobRunner,
    )
from lp.services.log.logger import (
    BufferLogger,
    DevNullLogger,
    )
from lp.testing import (
    TestCaseWithFactory,
    ZopeTestInSubProcess,
    )
from lp.testing.fakemethod import FakeMethod
from lp.testing.mail_helpers import pop_notifications


class NullJob(BaseRunnableJob):
    """A job that does nothing but append a string to a list."""

    implements(IRunnableJob)

    JOB_COMPLETIONS = []

    def __init__(self, completion_message, oops_recipients=None,
                 error_recipients=None):
        self.message = completion_message
        self.job = Job()
        self.oops_recipients = oops_recipients
        if self.oops_recipients is None:
            self.oops_recipients = []
        self.error_recipients = error_recipients
        if self.error_recipients is None:
            self.error_recipients = []

    def run(self):
        NullJob.JOB_COMPLETIONS.append(self.message)

    def getOopsRecipients(self):
        return self.oops_recipients

    def getOopsVars(self):
        return [('foo', 'bar')]

    def getErrorRecipients(self):
        return self.error_recipients

    def getOperationDescription(self):
        return 'appending a string to a list'


class RaisingJobException(Exception):
    """Raised by the RaisingJob when run."""


class RaisingJob(NullJob):
    """A job that raises when it runs."""

    def run(self):
        raise RaisingJobException(self.message)


class RaisingJobUserError(NullJob):
    """A job that raises a user error when it runs."""

    user_error_types = (RaisingJobException, )

    def run(self):
        raise RaisingJobException(self.message)


class RaisingJobRaisingNotifyOops(NullJob):
    """A job that raises when it runs, and when calling notifyOops."""

    def run(self):
        raise RaisingJobException(self.message)

    def notifyOops(self, oops):
        raise RaisingJobException('oops notifying oops')


class RaisingJobRaisingNotifyUserError(NullJob):
    """A job that raises when it runs, and when notifying user errors."""

    user_error_types = (RaisingJobException, )

    def run(self):
        raise RaisingJobException(self.message)

    def notifyUserError(self, error):
        raise RaisingJobException('oops notifying users')


class RetryError(Exception):
    pass


class RaisingRetryJob(NullJob):

    retry_error_types = (RetryError,)

    max_retries = 1

    def run(self):
        raise RetryError()


class TestJobRunner(TestCaseWithFactory):
    """Ensure JobRunner behaves as expected."""

    layer = LaunchpadZopelessLayer

    def makeTwoJobs(self):
        """Test fixture.  Create two jobs."""
        return NullJob("job 1"), NullJob("job 2")

    def test_runJob(self):
        """Ensure status is set to completed when a job runs to completion."""
        job_1, job_2 = self.makeTwoJobs()
        runner = JobRunner(job_1)
        runner.runJob(job_1)
        self.assertEqual(JobStatus.COMPLETED, job_1.job.status)
        self.assertEqual([job_1], runner.completed_jobs)

    def test_runAll(self):
        """Ensure runAll works in the normal case."""
        job_1, job_2 = self.makeTwoJobs()
        runner = JobRunner([job_1, job_2])
        runner.runAll()
        self.assertEqual(JobStatus.COMPLETED, job_1.job.status)
        self.assertEqual(JobStatus.COMPLETED, job_2.job.status)
        msg1 = NullJob.JOB_COMPLETIONS.pop()
        msg2 = NullJob.JOB_COMPLETIONS.pop()
        self.assertEqual(msg1, "job 2")
        self.assertEqual(msg2, "job 1")
        self.assertEqual([job_1, job_2], runner.completed_jobs)

    def test_runAll_skips_lease_failures(self):
        """Ensure runAll skips jobs whose leases can't be acquired."""
        job_1, job_2 = self.makeTwoJobs()
        job_2.job.acquireLease()
        runner = JobRunner([job_1, job_2])
        runner.runAll()
        self.assertEqual(JobStatus.COMPLETED, job_1.job.status)
        self.assertEqual(JobStatus.WAITING, job_2.job.status)
        self.assertEqual([job_1], runner.completed_jobs)
        self.assertEqual([job_2], runner.incomplete_jobs)
        self.assertEqual([], self.oopses)

    def test_runAll_reports_oopses(self):
        """When an error is encountered, report an oops and continue."""
        job_1, job_2 = self.makeTwoJobs()

        def raiseError():
            # Ensure that jobs which call transaction.abort work, too.
            transaction.abort()
            raise Exception('Fake exception.  Foobar, I say!')
        job_1.run = raiseError
        runner = JobRunner([job_1, job_2])
        runner.runAll()
        self.assertEqual([], pop_notifications())
        self.assertEqual([job_2], runner.completed_jobs)
        self.assertEqual([job_1], runner.incomplete_jobs)
        self.assertEqual(JobStatus.FAILED, job_1.job.status)
        self.assertEqual(JobStatus.COMPLETED, job_2.job.status)
        oops = self.oopses[-1]
        self.assertIn('Fake exception.  Foobar, I say!', oops['tb_text'])
        self.assertEqual(["{'foo': 'bar'}"], oops['req_vars'].values())

    def test_oops_messages_used_when_handling(self):
        """Oops messages should appear even when exceptions are handled."""
        job_1, job_2 = self.makeTwoJobs()

        def handleError():
            reporter = errorlog.globalErrorUtility
            try:
                raise ValueError('Fake exception.  Foobar, I say!')
            except ValueError:
                reporter.raising(sys.exc_info())
        job_1.run = handleError
        runner = JobRunner([job_1, job_2])
        runner.runAll()
        oops = self.oopses[-1]
        self.assertEqual(["{'foo': 'bar'}"], oops['req_vars'].values())

    def test_runAll_aborts_transaction_on_error(self):
        """runAll should abort the transaction on oops."""

        class DBAlterJob(NullJob):

            def __init__(self):
                super(DBAlterJob, self).__init__('')

            def run(self):
                self.job.log = 'hello'
                raise ValueError

        job = DBAlterJob()
        runner = JobRunner([job])
        runner.runAll()
        # If the transaction was committed, job.log == 'hello'.  If it was
        # aborted, it is None.
        self.assertIs(None, job.job.log)

    def test_runAll_mails_oopses(self):
        """Email interested parties about OOPses."""
        job_1, job_2 = self.makeTwoJobs()

        def raiseError():
            # Ensure that jobs which call transaction.abort work, too.
            transaction.abort()
            raise Exception('Fake exception.  Foobar, I say!')
        job_1.run = raiseError
        job_1.oops_recipients = ['jrandom@example.org']
        runner = JobRunner([job_1, job_2])
        runner.runAll()
        (notification,) = pop_notifications()
        oops = self.oopses[-1]
        self.assertIn(
            'Launchpad encountered an internal error during the following'
            ' operation: appending a string to a list.  It was logged with id'
            ' %s.  Sorry for the inconvenience.' % oops['id'],
            notification.get_payload(decode=True))
        self.assertNotIn('Fake exception.  Foobar, I say!',
                         notification.get_payload(decode=True))
        self.assertEqual('Launchpad internal error', notification['subject'])

    def test_runAll_mails_user_errors(self):
        """User errors should be mailed out without oopsing.

        User errors are identified by the RunnableJob.user_error_types
        attribute.  They do not cause an oops to be recorded, and their
        error messages are mailed to interested parties verbatim.
        """
        job_1, job_2 = self.makeTwoJobs()

        class ExampleError(Exception):
            pass

        def raiseError():
            raise ExampleError('Fake exception.  Foobar, I say!')
        job_1.run = raiseError
        job_1.user_error_types = (ExampleError,)
        job_1.error_recipients = ['jrandom@example.org']
        runner = JobRunner([job_1, job_2])
        runner.runAll()
        self.assertEqual([], self.oopses)
        notifications = pop_notifications()
        self.assertEqual(1, len(notifications))
        body = notifications[0].get_payload(decode=True)
        self.assertEqual(
            'Launchpad encountered an error during the following operation:'
            ' appending a string to a list.  Fake exception.  Foobar, I say!',
            body)
        self.assertEqual(
            'Launchpad error while appending a string to a list',
            notifications[0]['subject'])

    def test_runAll_requires_IRunnable(self):
        """Supplied classes must implement IRunnableJob.

        If they don't, we get a TypeError.  If they do, then we get an
        AttributeError, because we don't actually implement the interface.
        """
        runner = JobRunner([object()])
        self.assertRaises(TypeError, runner.runAll)

        class Runnable:
            implements(IRunnableJob)
        runner = JobRunner([Runnable()])
        self.assertRaises(AttributeError, runner.runAll)

    def test_runJob_records_failure(self):
        """When a job fails, the failure needs to be recorded."""
        job = RaisingJob('boom')
        runner = JobRunner([job])
        self.assertRaises(RaisingJobException, runner.runJob, job)
        # Abort the transaction to confirm that the update of the job status
        # has been committed.
        transaction.abort()
        self.assertEqual(JobStatus.FAILED, job.job.status)

    def test_runJobHandleErrors_oops_generated(self):
        """The handle errors method records an oops for raised errors."""
        job = RaisingJob('boom')
        runner = JobRunner([job])
        runner.runJobHandleError(job)
        self.assertEqual(1, len(self.oopses))

    def test_runJobHandleErrors_user_error_no_oops(self):
        """If the job raises a user error, there is no oops."""
        job = RaisingJobUserError('boom')
        runner = JobRunner([job])
        runner.runJobHandleError(job)
        self.assertEqual(0, len(self.oopses))

    def test_runJob_raising_retry_error(self):
        """If a job raises a retry_error, it should be re-queued."""
        job = RaisingRetryJob('completion')
        runner = JobRunner([job])
        with self.expectedLog('Scheduling retry due to RetryError'):
            runner.runJob(job)
        self.assertEqual(JobStatus.WAITING, job.status)
        self.assertNotIn(job, runner.completed_jobs)
        self.assertIn(job, runner.incomplete_jobs)

    def test_runJob_exceeding_max_retries(self):
        """If a job exceeds maximum retries, it should raise normally."""
        job = RaisingRetryJob('completion')
        JobRunner([job]).runJob(job)
        self.assertEqual(JobStatus.WAITING, job.status)
        runner = JobRunner([job])
        with ExpectedException(RetryError, ''):
            runner.runJob(job)
        self.assertEqual(JobStatus.FAILED, job.status)
        self.assertNotIn(job, runner.completed_jobs)
        self.assertIn(job, runner.incomplete_jobs)

    def test_runJobHandleErrors_oops_generated_notify_fails(self):
        """A second oops is logged if the notification of the oops fails."""
        job = RaisingJobRaisingNotifyOops('boom')
        runner = JobRunner([job])
        runner.runJobHandleError(job)
        self.assertEqual(2, len(self.oopses))

    def test_runJobHandleErrors_oops_generated_user_notify_fails(self):
        """A second oops is logged if the notification of the oops fails.

        In this test case the error is a user expected error, so the
        notifyUserError is called, and in this case the notify raises too.
        """
        job = RaisingJobRaisingNotifyUserError('boom')
        runner = JobRunner([job])
        runner.runJobHandleError(job)
        self.assertEqual(1, len(self.oopses))

    def test_runJob_with_SuspendJobException(self):
        # A job that raises SuspendJobError should end up suspended.
        job = NullJob('suspended')
        job.run = FakeMethod(failure=SuspendJobException())
        runner = JobRunner([job])
        runner.runJob(job)

        self.assertEqual(JobStatus.SUSPENDED, job.status)
        self.assertNotIn(job, runner.completed_jobs)
        self.assertIn(job, runner.incomplete_jobs)


class StaticJobSource(BaseRunnableJob):

    @classmethod
    def iterReady(cls):
        if not cls.done:
            for index, args in enumerate(cls.jobs):
                yield cls.get(index)
        cls.done = True

    @classmethod
    def get(cls, index):
        args = cls.jobs[index]
        return cls(index, *args)


class StuckJob(StaticJobSource):
    """Simulation of a job that stalls."""
    implements(IRunnableJob)

    done = False

    # A list of jobs to run: id, lease_length, delay.
    #
    # For the first job, have a very long lease, so that it
    # doesn't expire and so we soak up the ZCML loading time.  For the
    # second job, have a short lease so we hit the timeout.
    jobs = [
        (10000, 0),
        (5, 30),
        ]

    def __init__(self, id, lease_length, delay):
        self.id = id
        self.lease_length = lease_length
        self.delay = delay
        self.job = Job()

    def __repr__(self):
        return '<StuckJob(%r, lease_length=%s, delay=%s)>' % (
            self.id, self.lease_length, self.delay)

    def acquireLease(self):
        return self.job.acquireLease(self.lease_length)

    def run(self):
        sleep(self.delay)


class ShorterStuckJob(StuckJob):
    """Simulation of a job that stalls."""

    jobs = [
        (10000, 0),
        (0.05, 30),
        ]


class InitialFailureJob(StaticJobSource):

    implements(IRunnableJob)

    jobs = [(True,), (False,)]

    has_failed = False

    done = False

    def __init__(self, id, fail):
        self.id = id
        self.job = Job()
        self.fail = fail

    def run(self):
        if self.fail:
            InitialFailureJob.has_failed = True
            raise ValueError('I failed.')
        else:
            if InitialFailureJob.has_failed:
                raise ValueError('Previous failure.')


class ProcessSharingJob(StaticJobSource):

    implements(IRunnableJob)

    jobs = [(True,), (False,)]

    initial_job_was_here = False

    done = False

    def __init__(self, id, first):
        self.id = id
        self.job = Job()
        self.first = first

    def run(self):
        if self.first:
            ProcessSharingJob.initial_job_was_here = True
        else:
            if not ProcessSharingJob.initial_job_was_here:
                raise ValueError('Different process.')


class MemoryHogJob(StaticJobSource):

    implements(IRunnableJob)

    jobs = [()]

    done = False

    memory_limit = 0

    def __init__(self, id):
        self.job = Job()
        self.id = id

    def run(self):
        self.x = '*' * (10 ** 6)


class NoJobs(StaticJobSource):

    done = False

    jobs = []


class LeaseHeldJob(StaticJobSource):

    implements(IRunnableJob)

    jobs = [()]

    done = False

    def __init__(self, id):
        self.job = Job()
        self.id = id

    def acquireLease(self):
        raise LeaseHeld()


class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):

    # Needs AMQP
    layer = LaunchpadZopelessLayer

    def setUp(self):
        super(TestTwistedJobRunner, self).setUp()
        # The test relies on _pythonpath being importable. Thus we need to add
        # a directory that contains _pythonpath to the sys.path. We can rely
        # on the root directory of the checkout containing _pythonpath.
        if config.root not in sys.path:
            sys.path.append(config.root)
            self.addCleanup(sys.path.remove, config.root)

    def test_timeout_long(self):
        """When a job exceeds its lease, an exception is raised.

        Unfortunately, timeouts include the time it takes for the zope
        machinery to start up, so we run a job that will not time out first,
        followed by a job that is sure to time out.
        """
        logger = BufferLogger()
        logger.setLevel(logging.INFO)
        # StuckJob is actually a source of two jobs. The first is fast, the
        # second slow.
        runner = TwistedJobRunner.runFromSource(
            StuckJob, 'branchscanner', logger)

        self.assertEqual(
            (1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
        self.oops_capture.sync()
        oops = self.oopses[0]
        self.assertEqual(
            ('TimeoutError', 'Job ran too long.'),
            (oops['type'], oops['value']))
        self.assertThat(logger.getLogBuffer(), MatchesRegex(
            dedent("""\
            INFO Running through Twisted.
            INFO Running StuckJob \(ID .*\).
            INFO Running StuckJob \(ID .*\).
            INFO Job resulted in OOPS: .*
            """)))

    def test_timeout_short(self):
        """When a job exceeds its lease, an exception is raised.

        Unfortunately, timeouts include the time it takes for the zope
        machinery to start up, so we run a job that will not time out first,
        followed by a job that is sure to time out.
        """
        logger = BufferLogger()
        logger.setLevel(logging.INFO)
        # StuckJob is actually a source of two jobs. The first is fast, the
        # second slow.
        runner = TwistedJobRunner.runFromSource(
            ShorterStuckJob, 'branchscanner', logger)
        self.oops_capture.sync()
        oops = self.oopses[0]
        self.assertEqual(
            (1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
        self.assertThat(
            logger.getLogBuffer(), MatchesRegex(
                dedent("""\
                INFO Running through Twisted.
                INFO Running ShorterStuckJob \(ID .*\).
                INFO Running ShorterStuckJob \(ID .*\).
                INFO Job resulted in OOPS: %s
                """) % oops['id']))
        self.assertEqual(('TimeoutError', 'Job ran too long.'),
                         (oops['type'], oops['value']))

    def test_previous_failure_gives_new_process(self):
        """Failed jobs cause their worker to be terminated.

        When a job fails, it's not clear whether its process can be safely
        reused for a new job, so we kill the worker.
        """
        logger = BufferLogger()
        runner = TwistedJobRunner.runFromSource(
            InitialFailureJob, 'branchscanner', logger)
        self.assertEqual(
            (1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))

    def test_successful_jobs_share_process(self):
        """Successful jobs allow process reuse.

        When a job succeeds, we assume that its process can be safely reused
        for a new job, so we reuse the worker.
        """
        logger = BufferLogger()
        runner = TwistedJobRunner.runFromSource(
            ProcessSharingJob, 'branchscanner', logger)
        self.assertEqual(
            (2, 0), (len(runner.completed_jobs), len(runner.incomplete_jobs)))

    def test_memory_hog_job(self):
        """A job with a memory limit will trigger MemoryError on excess."""
        logger = BufferLogger()
        logger.setLevel(logging.INFO)
        runner = TwistedJobRunner.runFromSource(
            MemoryHogJob, 'branchscanner', logger)
        self.assertEqual(
            (0, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
        self.assertIn('Job resulted in OOPS', logger.getLogBuffer())
        self.oops_capture.sync()
        self.assertEqual('MemoryError', self.oopses[0]['type'])

    def test_no_jobs(self):
        logger = BufferLogger()
        logger.setLevel(logging.INFO)
        runner = TwistedJobRunner.runFromSource(
            NoJobs, 'branchscanner', logger)
        self.assertEqual(
            (0, 0), (len(runner.completed_jobs), len(runner.incomplete_jobs)))

    def test_lease_held_handled(self):
        """Jobs that raise LeaseHeld are handled correctly."""
        logger = BufferLogger()
        logger.setLevel(logging.DEBUG)
        runner = TwistedJobRunner.runFromSource(
            LeaseHeldJob, 'branchscanner', logger)
        self.assertIn('Could not acquire lease', logger.getLogBuffer())
        self.assertEqual(
            (0, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))


class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):

    layer = LaunchpadZopelessLayer

    def test_configures_oops_handler(self):
        """JobCronScript.main should configure the global error utility."""

        class DummyRunner:

            @classmethod
            def runFromSource(cls, source, dbuser, logger):
                expected_config = errorlog.ErrorReportingUtility()
                expected_config.configure('merge_proposal_jobs')
                # Check that the unique oops token was applied.
                self.assertEqual(
                    errorlog.globalErrorUtility.oops_prefix,
                    expected_config.oops_prefix)
                return cls()

            completed_jobs = []
            incomplete_jobs = []

        class JobCronScriptSubclass(JobCronScript):
            config_name = 'merge_proposal_jobs'
            source_interface = IUpdatePreviewDiffJobSource

            def __init__(self):
                super(JobCronScriptSubclass, self).__init__(
                    DummyRunner, test_args=[])
                self.logger = DevNullLogger()

        old_errorlog = errorlog.globalErrorUtility
        try:
            errorlog.globalErrorUtility = errorlog.ErrorReportingUtility()
            cronscript = JobCronScriptSubclass()
            cronscript.main()
        finally:
            errorlog.globalErrorUtility = old_errorlog

    def test_log_twisted_option_for_twisted_runner(self):
        """TwistedJobRunner creates --log-twisted flag."""
        jcs = JobCronScript(TwistedJobRunner, test_args=[])
        self.assertIsNot(None, getattr(jcs.options, 'log_twisted', None))

    def test_no_log_twisted_option_for_plain_runner(self):
        """JobRunner has no --log-twisted flag."""
        jcs = JobCronScript(JobRunner, test_args=[])
        self.assertIs(None, getattr(jcs.options, 'log_twisted', None))

    def test_log_twisted_flag(self):
        """--log-twisted sets JobCronScript.log_twisted True."""
        jcs = JobCronScript(TwistedJobRunner, test_args=['--log-twisted'])
        self.assertTrue(jcs.log_twisted)

    def test_no_log_twisted_flag(self):
        """No --log-twisted sets JobCronScript.log_twisted False."""
        jcs = JobCronScript(TwistedJobRunner, test_args=[])
        self.assertFalse(jcs.log_twisted)