~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/services/job/tests/test_job.py

  • Committer: Launchpad Patch Queue Manager
  • Date: 2011-06-09 10:49:32 UTC
  • mfrom: (7675.1190.7 db-bug-793382)
  • Revision ID: launchpad@pqm.canonical.com-20110609104932-ctzgwbfn80x1c9ll
[r=henninge][bug=793382] Bring requestUpgrades back to constant query
 count.

Show diffs side-by-side

added added

removed removed

Lines of Context:
8
8
 
9
9
import pytz
10
10
from storm.locals import Store
11
 
from zope.component import getUtility
12
11
 
13
12
from canonical.database.constants import UTC_NOW
14
 
from canonical.launchpad.webapp.interfaces import (
15
 
    DEFAULT_FLAVOR,
16
 
    IStoreSelector,
17
 
    MAIN_STORE,
18
 
    )
 
13
from canonical.launchpad.interfaces.lpstorm import IStore
19
14
from canonical.launchpad.webapp.testing import verifyObject
20
15
from canonical.testing.layers import ZopelessDatabaseLayer
21
16
from lp.services.job.interfaces.job import (
44
39
        job = Job()
45
40
        self.assertEqual(job.status, JobStatus.WAITING)
46
41
 
 
42
    def test_createMultiple_creates_requested_number_of_jobs(self):
 
43
        job_ids = list(Job.createMultiple(IStore(Job), 3))
 
44
        self.assertEqual(3, len(job_ids))
 
45
        self.assertEqual(3, len(set(job_ids)))
 
46
 
 
47
    def test_createMultiple_returns_valid_job_ids(self):
 
48
        job_ids = list(Job.createMultiple(IStore(Job), 3))
 
49
        store = IStore(Job)
 
50
        for job_id in job_ids:
 
51
            self.assertIsNot(None, store.get(Job, job_id))
 
52
 
 
53
    def test_createMultiple_sets_status_to_WAITING(self):
 
54
        store = IStore(Job)
 
55
        job = store.get(Job, Job.createMultiple(store, 1)[0])
 
56
        self.assertEqual(JobStatus.WAITING, job.status)
 
57
 
47
58
    def test_start(self):
48
59
        """Job.start should update the object appropriately.
49
60
 
214
225
    layer = ZopelessDatabaseLayer
215
226
 
216
227
    def _sampleData(self):
217
 
        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
218
 
        return list(store.execute(Job.ready_jobs))
 
228
        return list(IStore(Job).execute(Job.ready_jobs))
219
229
 
220
230
    def test_ready_jobs(self):
221
231
        """Job.ready_jobs should include new jobs."""