1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
|
# Copyright 2009-2010 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Functional tests for process-death-row.py script.
See lib/canonical/launchpad/doc/deathrow.txt for more detailed tests
of the module functionality; here we just aim to test that the script
processes its arguments and handles dry-run correctly.
"""
__metaclass__ = type
import datetime
import os
import shutil
import subprocess
import sys
from tempfile import mkdtemp
from unittest import TestCase
import pytz
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
from canonical.config import config
from canonical.testing.layers import LaunchpadZopelessLayer
from lp.registry.interfaces.distribution import IDistributionSet
from lp.registry.interfaces.person import IPersonSet
from lp.soyuz.enums import PackagePublishingStatus
from lp.soyuz.model.publishing import SourcePackagePublishingHistory
class TestProcessDeathRow(TestCase):
"""Test the process-death-row.py script works properly."""
layer = LaunchpadZopelessLayer
def runDeathRow(self, extra_args, distribution="ubuntutest"):
"""Run process-death-row.py, returning the result and output."""
script = os.path.join(config.root, "scripts", "process-death-row.py")
args = [sys.executable, script, "-v", "-d", distribution,
"-p", self.primary_test_folder]
args.extend(extra_args)
process = subprocess.Popen(
args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
err_msg = ("process-deathrow returned %s:\n%s" %
(process.returncode, stderr))
self.assertEqual(process.returncode, 0, err_msg)
return (process.returncode, stdout, stderr)
def setUp(self):
"""Set up for a test death row run."""
self.setupPrimaryArchive()
self.setupPPA()
# Commit so script can see our publishing record changes.
self.layer.txn.commit()
def tearDown(self):
"""Clean up after ourselves."""
self.tearDownPrimaryArchive()
self.tearDownPPA()
def setupPrimaryArchive(self):
"""Create pending removal publications in ubuntutest PRIMARY archive.
Also places the respective content in disk, so it can be removed
and verified.
"""
ubuntutest = getUtility(IDistributionSet)["ubuntutest"]
ut_alsautils = ubuntutest.getSourcePackage("alsa-utils")
ut_alsautils_109a4 = ut_alsautils.getVersion("1.0.9a-4")
primary_pubrecs = ut_alsautils_109a4.publishing_history
self.primary_pubrec_ids = self.markPublishingForRemoval(
primary_pubrecs)
self.primary_test_folder = mkdtemp()
package_folder = os.path.join(
self.primary_test_folder, "main", "a", "alsa-utils")
os.makedirs(package_folder)
self.primary_package_path = os.path.join(
package_folder, "alsa-utils_1.0.9a-4.dsc")
self.writeContent(self.primary_package_path)
def tearDownPrimaryArchive(self):
shutil.rmtree(self.primary_test_folder)
def setupPPA(self):
"""Create pending removal publications in cprov PPA.
Firstly, transform the cprov & mark PPAs in a ubuntutest PPA,
since ubuntu publish configuration is broken in the sampledata.
Also create one respective file in disk, so it can be removed and
verified.
"""
ubuntutest = getUtility(IDistributionSet)['ubuntutest']
cprov = getUtility(IPersonSet).getByName('cprov')
removeSecurityProxy(cprov.archive).distribution = ubuntutest
ppa_pubrecs = cprov.archive.getPublishedSources('iceweasel')
self.ppa_pubrec_ids = self.markPublishingForRemoval(ppa_pubrecs)
mark = getUtility(IPersonSet).getByName('mark')
removeSecurityProxy(mark.archive).distribution = ubuntutest
ppa_pubrecs = mark.archive.getPublishedSources('iceweasel')
self.ppa_pubrec_ids.extend(self.markPublishingForRemoval(ppa_pubrecs))
# Fill one of the files in cprov PPA just to ensure that deathrow
# will be able to remove it. The other files can remain missing
# in order to test if deathrow can cope with not-found files.
self.ppa_test_folder = os.path.join(
config.personalpackagearchive.root, "cprov", cprov.archive.name)
package_folder = os.path.join(
self.ppa_test_folder, "ubuntutest/pool/main/i/iceweasel")
os.makedirs(package_folder)
self.ppa_package_path = os.path.join(
package_folder, "iceweasel-1.0.dsc")
self.writeContent(self.ppa_package_path)
def tearDownPPA(self):
shutil.rmtree(self.ppa_test_folder)
def writeContent(self, path, content="whatever"):
f = open(path, "w")
f.write("This is some test file contents")
f.close()
def markPublishingForRemoval(self, pubrecs):
"""Mark the given publishing record for removal."""
pubrec_ids = []
for pubrec in pubrecs:
pubrec.status = PackagePublishingStatus.SUPERSEDED
pubrec.dateremoved = None
pubrec.scheduleddeletiondate = datetime.datetime(
1999, 1, 1, tzinfo=pytz.UTC)
pubrec_ids.append(pubrec.id)
return pubrec_ids
def probePublishingStatus(self, pubrec_ids, status):
"""Check if all source publishing records match the given status."""
for pubrec_id in pubrec_ids:
spph = SourcePackagePublishingHistory.get(pubrec_id)
self.assertEqual(
spph.status, status, "ID %s -> %s (expected %s)" % (
spph.id, spph.status.title, status.title))
def probeRemoved(self, pubrec_ids):
"""Check if all source publishing records were removed."""
right_now = datetime.datetime.now(pytz.timezone('UTC'))
for pubrec_id in pubrec_ids:
spph = SourcePackagePublishingHistory.get(pubrec_id)
self.assertTrue(
spph.dateremoved < right_now,
"ID %s -> not removed" % (spph.id))
def probeNotRemoved(self, pubrec_ids):
"""Check if all source publishing records were not removed."""
for pubrec_id in pubrec_ids:
spph = SourcePackagePublishingHistory.get(pubrec_id)
self.assertTrue(
spph.dateremoved is None,
"ID %s -> removed" % (spph.id))
def testDryRun(self):
"""Test we don't delete the file or change the db in dry run mode."""
self.runDeathRow(["-n"])
self.assertTrue(os.path.exists(self.primary_package_path))
self.assertTrue(os.path.exists(self.ppa_package_path))
self.probePublishingStatus(
self.primary_pubrec_ids, PackagePublishingStatus.SUPERSEDED)
self.probeNotRemoved(self.primary_pubrec_ids)
self.probePublishingStatus(
self.ppa_pubrec_ids, PackagePublishingStatus.SUPERSEDED)
self.probeNotRemoved(self.ppa_pubrec_ids)
def testWetRun(self):
"""Test we do delete the file and change the db in wet run mode."""
self.runDeathRow([])
self.assertFalse(os.path.exists(self.primary_package_path))
self.assertTrue(os.path.exists(self.ppa_package_path))
self.probePublishingStatus(
self.primary_pubrec_ids, PackagePublishingStatus.SUPERSEDED)
self.probeRemoved(self.primary_pubrec_ids)
self.probePublishingStatus(
self.ppa_pubrec_ids, PackagePublishingStatus.SUPERSEDED)
self.probeNotRemoved(self.ppa_pubrec_ids)
def testPPARun(self):
"""Test we only work upon PPA."""
self.runDeathRow(["--ppa"])
self.assertTrue(os.path.exists(self.primary_package_path))
self.assertFalse(os.path.exists(self.ppa_package_path))
self.probePublishingStatus(
self.primary_pubrec_ids, PackagePublishingStatus.SUPERSEDED)
self.probeNotRemoved(self.primary_pubrec_ids)
self.probePublishingStatus(
self.ppa_pubrec_ids, PackagePublishingStatus.SUPERSEDED)
self.probeRemoved(self.ppa_pubrec_ids)
|