~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/canonical/codehosting/inmemory.py

merge stable fixing conflicts

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
 
18
18
from canonical.database.constants import UTC_NOW
19
19
from canonical.launchpad.database.branchnamespace import BranchNamespaceSet
20
 
from canonical.launchpad.database.branchtarget import ProductBranchTarget
 
20
from canonical.launchpad.database.branchtarget import (
 
21
    PackageBranchTarget, ProductBranchTarget)
21
22
from canonical.launchpad.interfaces.branch import BranchType, IBranch
22
23
from canonical.launchpad.interfaces.branchtarget import IBranchTarget
23
24
from canonical.launchpad.interfaces.codehosting import (
24
25
    BRANCH_TRANSPORT, CONTROL_TRANSPORT, LAUNCHPAD_ANONYMOUS,
25
26
    LAUNCHPAD_SERVICES)
 
27
from canonical.launchpad.interfaces.publishing import PackagePublishingPocket
26
28
from canonical.launchpad.testing import ObjectFactory
27
29
from canonical.launchpad.validators import LaunchpadValidationError
28
30
from canonical.launchpad.xmlrpc.codehosting import (
109
111
        self.sourcepackagename = sourcepackagename
110
112
        self.distroseries = distroseries
111
113
 
 
114
    def __hash__(self):
 
115
        return hash((self.sourcepackagename.id, self.distroseries.id))
 
116
 
 
117
    def __eq__(self, other):
 
118
        return (self.sourcepackagename.id == other.sourcepackagename.id
 
119
                and self.distroseries.id == other.distroseries.id)
 
120
 
 
121
    def __ne__(self, other):
 
122
        return not (self == other)
 
123
 
112
124
    @property
113
125
    def distribution(self):
114
126
        if self.distroseries is not None:
116
128
        else:
117
129
            return None
118
130
 
 
131
    @property
 
132
    def development_version(self):
 
133
        name = '%s-devel' % self.distribution.name
 
134
        dev_series = self._distroseries_set.getByName(name)
 
135
        if dev_series is None:
 
136
            dev_series = FakeDistroSeries(name, self.distribution)
 
137
            self._distroseries_set._add(dev_series)
 
138
        return self.__class__(self.sourcepackagename, dev_series)
 
139
 
 
140
    @property
 
141
    def path(self):
 
142
        return '%s/%s/%s' % (
 
143
            self.distribution.name,
 
144
            self.distroseries.name,
 
145
            self.sourcepackagename.name)
 
146
 
 
147
    def getBranch(self, pocket):
 
148
        return self.distroseries._linked_branches.get(
 
149
            (self, pocket), None)
 
150
 
 
151
    def setBranch(self, pocket, branch, registrant):
 
152
        self.distroseries._linked_branches[self, pocket] = branch
 
153
 
 
154
 
 
155
@adapter(FakeSourcePackage)
 
156
@implementer(IBranchTarget)
 
157
def fake_source_package_to_branch_target(fake_package):
 
158
    return PackageBranchTarget(fake_package)
 
159
 
119
160
 
120
161
class FakeBranch(FakeDatabaseObject):
121
162
    """Fake branch object."""
231
272
        """See `IProductSeries`."""
232
273
        return self.user_branch
233
274
 
 
275
    @property
 
276
    def series_branch(self):
 
277
        """See `IProductSeries`."""
 
278
        return self.user_branch
 
279
 
234
280
 
235
281
class FakeScriptActivity(FakeDatabaseObject):
236
282
    """Fake script activity."""
254
300
    def __init__(self, name, distribution):
255
301
        self.name = name
256
302
        self.distribution = distribution
 
303
        self._linked_branches = {}
257
304
 
258
305
 
259
306
class FakeSourcePackageName(FakeDatabaseObject):
311
358
    def makeAnyBranch(self, **kwargs):
312
359
        return self.makeProductBranch(**kwargs)
313
360
 
 
361
    def makePackageBranch(self, sourcepackage=None, **kwargs):
 
362
        if sourcepackage is None:
 
363
            sourcepackage = self.makeSourcePackage()
 
364
        return self.makeBranch(
 
365
            product=None, sourcepackage=sourcepackage, **kwargs)
 
366
 
314
367
    def makePersonalBranch(self, owner=None, **kwargs):
315
368
        if owner is None:
316
369
            owner = self.makePerson()
344
397
            distroseries = self.makeDistroRelease()
345
398
        if sourcepackagename is None:
346
399
            sourcepackagename = self.makeSourcePackageName()
347
 
        return FakeSourcePackage(sourcepackagename, distroseries)
 
400
        package = FakeSourcePackage(sourcepackagename, distroseries)
 
401
        package._distroseries_set = self._distroseries_set
 
402
        return package
348
403
 
349
404
    def makeTeam(self, owner):
350
405
        team = FakeTeam(name=self.getUniqueString(), members=[owner])
370
425
        """
371
426
        if branch is None:
372
427
            branch = self.makeBranch(product=product)
373
 
        branch._mirrored = True
374
428
        product.development_focus.user_branch = branch
375
429
        branch.last_mirrored = 'rev1'
376
430
        return branch
377
431
 
 
432
    def enableDefaultStackingForPackage(self, package, branch):
 
433
        """Give 'package' a default stacked-on branch.
 
434
 
 
435
        :param package: The package to give a default stacked-on branch to.
 
436
        :param branch: The branch that should be the default stacked-on
 
437
            branch.
 
438
        """
 
439
        package.development_version.setBranch(
 
440
            PackagePublishingPocket.RELEASE, branch, branch.owner)
 
441
        branch.last_mirrored = 'rev1'
 
442
        return branch
 
443
 
378
444
 
379
445
class FakeBranchPuller:
380
446
 
519
585
                return faults.NotFound(
520
586
                    "No such source package: '%s'."
521
587
                    % (data['sourcepackagename'],))
522
 
            sourcepackage = FakeSourcePackage(sourcepackagename, distroseries)
 
588
            sourcepackage = self._factory.makeSourcePackage(
 
589
                distroseries, sourcepackagename)
523
590
        else:
524
591
            return faults.PermissionDenied(
525
592
                "Cannot create branch at '%s'" % branch_path)
558
625
        person = self._person_set.get(person_id)
559
626
        return person.inTeam(branch.owner)
560
627
 
 
628
    def _get_product_target(self, path):
 
629
        try:
 
630
            owner_name, product_name = path.split('/')
 
631
        except ValueError:
 
632
            # Wrong number of segments -- can't be a product.
 
633
            return
 
634
        product = self._product_set.getByName(product_name)
 
635
        return product
 
636
 
 
637
    def _get_package_target(self, path):
 
638
        try:
 
639
            owner_name, distro_name, series_name, package_name = (
 
640
                path.split('/'))
 
641
        except ValueError:
 
642
            # Wrong number of segments -- can't be a package.
 
643
            return
 
644
        distro = self._distribution_set.getByName(distro_name)
 
645
        distroseries = self._distroseries_set.getByName(series_name)
 
646
        sourcepackagename = self._sourcepackagename_set.getByName(
 
647
            package_name)
 
648
        if None in (distro, distroseries, sourcepackagename):
 
649
            return
 
650
        return self._factory.makeSourcePackage(
 
651
            distroseries, sourcepackagename)
 
652
 
561
653
    def _serializeControlDirectory(self, requester, product_path,
562
654
                                   trailing_path):
563
 
        try:
564
 
            owner_name, product_name, bazaar = product_path.split('/')
565
 
        except ValueError:
566
 
            # Wrong number of segments -- can't be a product.
567
 
            return
568
 
        if bazaar != '.bzr':
569
 
            return
570
 
        product = self._product_set.getByName(product_name)
571
 
        if product is None:
572
 
            return
573
 
        default_branch = IBranchTarget(product).default_stacked_on_branch
 
655
        if not ('.bzr' == trailing_path or trailing_path.startswith('.bzr/')):
 
656
            return
 
657
        target = self._get_product_target(product_path)
 
658
        if target is None:
 
659
            target = self._get_package_target(product_path)
 
660
        if target is None:
 
661
            return
 
662
        default_branch = IBranchTarget(target).default_stacked_on_branch
574
663
        if default_branch is None:
575
664
            return
576
665
        if not self._canRead(requester, default_branch):
578
667
        return (
579
668
            CONTROL_TRANSPORT,
580
669
            {'default_stack_on': escape('/' + default_branch.unique_name)},
581
 
            '/'.join([bazaar, trailing_path]))
 
670
            trailing_path)
582
671
 
583
672
    def _serializeBranch(self, requester_id, branch, trailing_path):
584
673
        if not self._canRead(requester_id, branch):
642
731
            self._sourcepackagename_set, self._factory)
643
732
        sm = getSiteManager()
644
733
        sm.registerAdapter(fake_product_to_branch_target)
 
734
        sm.registerAdapter(fake_source_package_to_branch_target)
645
735
 
646
736
    def getFilesystemEndpoint(self):
647
737
        """See `LaunchpadDatabaseFrontend`.