1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
|
# Copyright 2009-2010 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Tests for `CustomUploads`."""
__metaclass__ = type
import cStringIO
import os
import shutil
import tarfile
import tempfile
import unittest
from lp.archivepublisher.customupload import (
CustomUpload,
CustomUploadTarballBadFile,
CustomUploadTarballBadSymLink,
CustomUploadTarballInvalidFileType,
)
from lp.testing import TestCase
class TestCustomUpload(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.mkdtemp(prefix='archive_root_')
def tearDown(self):
shutil.rmtree(self.test_dir)
def assertEntries(self, entries):
self.assertEqual(
entries, sorted(os.listdir(self.test_dir)))
def testFixCurrentSymlink(self):
"""Test `CustomUpload.fixCurrentSymlink` behaviour.
Ensure only 3 entries named as valid versions are kept around and
the 'current' symbolic link is created (or updated) to point to the
latests entry.
Also check if it copes with entries not named as valid versions and
leave them alone.
"""
# Setup a bogus `CustomUpload` object with the 'targetdir' pointing
# to the directory created for the test.
custom_processor = CustomUpload(None, None, None)
custom_processor.targetdir = self.test_dir
# Let's create 4 entries named as valid versions.
os.mkdir(os.path.join(self.test_dir, '1.0'))
os.mkdir(os.path.join(self.test_dir, '1.1'))
os.mkdir(os.path.join(self.test_dir, '1.2'))
os.mkdir(os.path.join(self.test_dir, '1.3'))
self.assertEntries(['1.0', '1.1', '1.2', '1.3'])
# `fixCurrentSymlink` will keep only the latest 3 and create a
# 'current' symbolic link the the highest one.
custom_processor.fixCurrentSymlink()
self.assertEntries(['1.1', '1.2', '1.3', 'current'])
self.assertEqual(
'1.3', os.readlink(os.path.join(self.test_dir, 'current')))
# When there is a invalid version present in the directory it is
# ignored, since it was probably put there manually. The symbolic
# link still pointing to the latest version.
os.mkdir(os.path.join(self.test_dir, '1.4'))
os.mkdir(os.path.join(self.test_dir, 'alpha-5'))
custom_processor.fixCurrentSymlink()
self.assertEntries(['1.2', '1.3', '1.4', 'alpha-5', 'current'])
self.assertEqual(
'1.4', os.readlink(os.path.join(self.test_dir, 'current')))
class TestTarfileVerification(TestCase):
def setUp(self):
TestCase.setUp(self)
self.tarfile_path = "/tmp/_verify_extract"
self.tarfile_name = os.path.join(
self.tarfile_path, "test_tarfile.tar")
self.custom_processor = CustomUpload(None, self.tarfile_name, None)
self.custom_processor.tmpdir = self.makeTemporaryDirectory()
def createTarfile(self):
self.tar_fileobj = cStringIO.StringIO()
tar_file = tarfile.open(name=None, mode="w", fileobj=self.tar_fileobj)
root_info = tarfile.TarInfo(name='./')
root_info.type = tarfile.DIRTYPE
tar_file.addfile(root_info)
# Ordering matters here, addCleanup pushes onto a stack which is
# popped in reverse order.
self.addCleanup(self.tar_fileobj.close)
self.addCleanup(tar_file.close)
return tar_file
def createSymlinkInfo(self, target, name="i_am_a_symlink"):
info = tarfile.TarInfo(name=name)
info.type = tarfile.SYMTYPE
info.linkname = target
return info
def createTarfileWithSymlink(self, target):
info = self.createSymlinkInfo(target)
tar_file = self.createTarfile()
tar_file.addfile(info)
return tar_file
def createTarfileWithFile(self, file_type, name="testfile"):
info = tarfile.TarInfo(name=name)
info.type = file_type
tar_file = self.createTarfile()
tar_file.addfile(info)
return tar_file
def assertFails(self, exception, tar_file):
self.assertRaises(
exception,
self.custom_processor.verifyBeforeExtracting,
tar_file)
def assertPasses(self, tar_file):
result = self.custom_processor.verifyBeforeExtracting(tar_file)
self.assertTrue(result)
def testFailsToExtractBadSymlink(self):
"""Fail if a symlink's target is outside the tmp tree."""
tar_file = self.createTarfileWithSymlink(target="/etc/passwd")
self.assertFails(CustomUploadTarballBadSymLink, tar_file)
def testFailsToExtractBadRelativeSymlink(self):
"""Fail if a symlink's relative target is outside the tmp tree."""
tar_file = self.createTarfileWithSymlink(target="../outside")
self.assertFails(CustomUploadTarballBadSymLink, tar_file)
def testFailsToExtractBadFileType(self):
"""Fail if a file in a tarfile is not a regular file or a symlink."""
tar_file = self.createTarfileWithFile(tarfile.FIFOTYPE)
self.assertFails(CustomUploadTarballInvalidFileType, tar_file)
def testFailsToExtractBadFileLocation(self):
"""Fail if the file resolves to a path outside the tmp tree."""
tar_file = self.createTarfileWithFile(tarfile.REGTYPE, "../outside")
self.assertFails(CustomUploadTarballBadFile, tar_file)
def testFailsToExtractBadAbsoluteFileLocation(self):
"""Fail if the file resolves to a path outside the tmp tree."""
tar_file = self.createTarfileWithFile(tarfile.REGTYPE, "/etc/passwd")
self.assertFails(CustomUploadTarballBadFile, tar_file)
def testRegularFileDoesntRaise(self):
"""Adding a normal file should pass inspection."""
tar_file = self.createTarfileWithFile(tarfile.REGTYPE)
self.assertPasses(tar_file)
def testDirectoryDoesntRaise(self):
"""Adding a directory should pass inspection."""
tar_file = self.createTarfileWithFile(tarfile.DIRTYPE)
self.assertPasses(tar_file)
def testSymlinkDoesntRaise(self):
"""Adding a symlink should pass inspection."""
tar_file = self.createTarfileWithSymlink(target="something/blah")
self.assertPasses(tar_file)
def testRelativeSymlinkToRootDoesntRaise(self):
tar_file = self.createTarfileWithSymlink(target=".")
self.assertPasses(tar_file)
def testRelativeSymlinkTargetInsideDirectoryDoesntRaise(self):
tar_file = self.createTarfileWithFile(
tarfile.DIRTYPE, name="testdir")
info = self.createSymlinkInfo(
name="testdir/symlink", target="../dummy")
tar_file.addfile(info)
self.assertPasses(tar_file)
def test_extract(self):
"""Test that the extract method calls the verify function.
This test is different from the previous tests in that it actually
pokes a fake tar file on disk. This is slower, so it's only done
once, here.
"""
# Make a bad tarfile and poke it into the custom processor.
self.custom_processor.tmpdir = None
try:
os.makedirs(self.tarfile_path)
tar_file = tarfile.open(self.tarfile_name, mode="w")
info = tarfile.TarInfo("test_file")
info.type = tarfile.FIFOTYPE
tar_file.addfile(info)
tar_file.close()
self.assertRaises(
CustomUploadTarballInvalidFileType,
self.custom_processor.extract)
finally:
shutil.rmtree(self.tarfile_path)
|