1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
# Copyright 2009-2010 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Tests for pool.py."""
__metaclass__ = type
import hashlib
import os
import shutil
from tempfile import mkdtemp
import unittest
from lp.archivepublisher.diskpool import (
DiskPool,
poolify,
)
from lp.services.log.logger import BufferLogger
class MockFile:
def __init__(self, contents):
self.contents = contents
def open(self):
self.loc = 0
def read(self, chunksize):
end_chunk = self.loc + chunksize
chunk = self.contents[self.loc:end_chunk]
self.loc = end_chunk
return chunk
def close(self):
pass
class PoolTestingFile:
def __init__(self, pool, sourcename, filename):
self.pool = pool
self.sourcename = sourcename
self.filename = filename
self.contents = sourcename
def addToPool(self, component):
return self.pool.addFile(
component, self.sourcename, self.filename,
hashlib.sha1(self.contents).hexdigest(), MockFile(self.contents))
def removeFromPool(self, component):
return self.pool.removeFile(component, self.sourcename, self.filename)
def checkExists(self, component):
path = self.pool.pathFor(component, self.sourcename, self.filename)
return os.path.exists(path)
def checkIsLink(self, component):
path = self.pool.pathFor(component, self.sourcename, self.filename)
return os.path.islink(path)
def checkIsFile(self, component):
return self.checkExists(component) and not self.checkIsLink(component)
class TestPoolification(unittest.TestCase):
def testPoolificationOkay(self):
"""poolify should poolify properly"""
cases = (
("foo", "main", "main/f/foo"),
("foo", "universe", "universe/f/foo"),
("libfoo", "main", "main/libf/libfoo"),
)
for case in cases:
self.assertEqual(case[2], poolify(case[0], case[1]))
class TestPool(unittest.TestCase):
def setUp(self):
self.pool_path = mkdtemp()
self.temp_path = mkdtemp()
self.pool = DiskPool(self.pool_path, self.temp_path, BufferLogger())
def tearDown(self):
shutil.rmtree(self.pool_path)
shutil.rmtree(self.temp_path)
def testSimpleAdd(self):
"""Adding a new file should work."""
foo = PoolTestingFile(self.pool, "foo", "foo-1.0.deb")
result = foo.addToPool("main")
self.assertEqual(self.pool.results.FILE_ADDED, result)
self.assertTrue(foo.checkIsFile("main"))
def testSimpleSymlink(self):
"""Adding a file twice should result in a symlink."""
foo = PoolTestingFile(self.pool, "foo", "foo-1.0.deb")
foo.addToPool("main")
result = foo.addToPool("universe")
self.assertEqual(self.pool.results.SYMLINK_ADDED, result)
self.assertTrue(foo.checkIsFile("main"))
self.assertTrue(foo.checkIsLink("universe"))
def testSymlinkShuffleOnAdd(self):
"""If the second add is a more preferred component, links shuffle."""
foo = PoolTestingFile(self.pool, "foo", "foo-1.0.deb")
foo.addToPool("universe")
result = foo.addToPool("main")
self.assertEqual(self.pool.results.SYMLINK_ADDED, result)
self.assertTrue(foo.checkIsFile("main"))
self.assertTrue(foo.checkIsLink("universe"))
def testRemoveSymlink(self):
"""Remove file should just remove a symlink"""
foo = PoolTestingFile(self.pool, "foo", "foo-1.0.deb")
foo.addToPool("main")
foo.addToPool("universe")
size = foo.removeFromPool("universe")
self.assertFalse(foo.checkExists("universe"))
self.assertEqual(31, size)
def testRemoveLoneFile(self):
"""Removing a file with no symlinks removes it."""
foo = PoolTestingFile(self.pool, "foo", "foo-1.0.deb")
foo.addToPool("main")
size = foo.removeFromPool("main")
self.assertFalse(foo.checkExists("universe"))
self.assertEqual(3, size)
def testSymlinkShuffleOnRemove(self):
"""Removing a file with a symlink shuffles links."""
foo = PoolTestingFile(self.pool, "foo", "foo-1.0.deb")
foo.addToPool("universe")
foo.addToPool("main")
foo.removeFromPool("main")
self.assertFalse(foo.checkExists("main"))
self.assertTrue(foo.checkIsFile("universe"))
|