~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/canonical/poppy/filesystem.py

  • Committer: Celso Providelo
  • Date: 2006-10-20 21:08:17 UTC
  • mto: (3691.445.1 ppa-poppy)
  • mto: This revision was merged to the branch mainline in revision 4274.
  • Revision ID: celso.providelo@canonical.com-20061020210817-40e4b8a5ec3be9d9
Add directory supporting for poppy: recursive mkdir & rmdir, ls/lsinfo/type and protection for underneath directories. Add also support to automatically creation of directories 'CWD'ed in. Tested with dupload and dput

Show diffs side-by-side

added added

removed removed

Lines of Context:
6
6
import glob
7
7
import os
8
8
import stat
 
9
import shutil
9
10
from zope.security.interfaces import Unauthorized
10
11
from zope.server.interfaces.ftp import IFileSystem
11
12
from zope.interface import implements
20
21
 
21
22
    def _full(self, path):
22
23
        """Returns the full path name (i.e. rootpath + path)"""
23
 
        return os.path.join(self.rootpath, path)
 
24
        full_path = os.path.join(self.rootpath, path)
 
25
        if not os.path.realpath(full_path).startswith(self.rootpath):
 
26
            raise OSError("Path not allowed:", path)
 
27
        return full_path
24
28
 
25
29
    def _sanitize(self, path):
26
30
        if path.startswith('/'):
27
31
            path = path[1:]
28
32
        path = os.path.normpath(path)
29
 
        if '/' in path:
30
 
            raise OSError("No access to anything outside CWD:", path)
31
 
        else:
32
 
            return path
 
33
        return path
33
34
 
34
35
    def type(self, path):
35
36
        """Return the file type at path
44
45
                return 'd'
45
46
            elif os.path.isfile(full_path):
46
47
                return 'f'
47
 
    
 
48
 
48
49
    def names(self, path, filter=None):
49
50
        """Return a sequence of the names in a directory
50
51
 
52
53
        the filter returns a true value.
53
54
        """
54
55
        path = self._sanitize(path)
55
 
        if path != ".":
56
 
            return []
57
 
        glob_files = glob.glob(os.path.join(self.rootpath, "*"))
 
56
        full_path = self._full(path)
 
57
        if not os.path.exists(full_path):
 
58
            raise OSError("Not exists:", path)
 
59
        glob_files = glob.glob(os.path.join(self.rootpath, path, "*"))
58
60
        prefix = "%s/" % self.rootpath
59
61
        files = []
60
62
        for filename in glob_files:
61
 
            if os.path.isfile(filename):
62
 
                filename = filename.replace(prefix, "")
63
 
                if not filter or filter(filename):
64
 
                    files += [filename]
 
63
            filename = filename.replace(prefix, "")
 
64
            filename = filename.replace("./", "")
 
65
            if not filter or filter(filename):
 
66
                files += [filename]
65
67
        return files
66
68
 
67
69
    def ls(self, path, filter=None):
68
70
        """Return a sequence of information objects."""
69
71
        path = self._sanitize(path)
70
 
        if path != ".":
71
 
            return []
72
 
        glob_files = glob.glob(os.path.join(self.rootpath, "*"))
 
72
        full_path = self._full(path)
 
73
        if not os.path.exists(full_path):
 
74
            raise OSError("Not exists:", path)
 
75
        glob_files = glob.glob(os.path.join(self.rootpath, path, "*"))
73
76
        prefix = "%s/" % self.rootpath
74
77
        infos = []
75
78
        for filename in glob_files:
76
 
            if os.path.isfile(filename):
77
 
                filename = filename.replace(prefix, "")
78
 
                if filter is None or filter(filename):
79
 
                    infos.append(self.lsinfo(filename))
 
79
            filename = filename.replace(prefix, "")
 
80
            filename = filename.replace("./", "")
 
81
            if not filter or filter(filename):
 
82
                infos.append(self.lsinfo(filename))
80
83
        return infos
81
84
 
82
85
    def readfile(self, path, outstream, start=0, end=None):
94
97
        """
95
98
        path = self._sanitize(path)
96
99
        full_path = self._full(path)
97
 
        if os.path.exists(full_path):
98
 
            if os.path.isdir(full_path):
99
 
                raise OSError("Is a directory:", path)
100
 
        else:
 
100
        if not os.path.exists(full_path):
101
101
            raise OSError("Not exists:", path)
102
 
        info = {"type": 'f',
103
 
                "owner_name": "upload",
 
102
 
 
103
        info = {"owner_name": "upload",
104
104
                "group_name": "upload",
105
 
                "nlinks": 1,
106
 
                "name": path}
 
105
                "name": path.split("/")[-1]}
 
106
 
107
107
        s = os.stat(full_path)
 
108
 
108
109
        info["owner_readable"] = bool(s[stat.ST_MODE] & stat.S_IRUSR)
109
110
        info["owner_writable"] = bool(s[stat.ST_MODE] & stat.S_IWUSR)
110
111
        info["owner_executable"] = bool(s[stat.ST_MODE] & stat.S_IXUSR)
114
115
        info["other_readable"] = bool(s[stat.ST_MODE] & stat.S_IROTH)
115
116
        info["other_writable"] = bool(s[stat.ST_MODE] & stat.S_IWOTH)
116
117
        info["other_executable"] = bool(s[stat.ST_MODE] & stat.S_IXOTH)
117
 
        info["mtime"] = datetime.datetime.fromtimestamp(self.mtime(path)) 
 
118
        info["mtime"] = datetime.datetime.fromtimestamp(self.mtime(path))
118
119
        info["size"] = self.size(path)
 
120
        info["type"] = self.type(path)
 
121
        info["nlinks"] = s[stat.ST_NLINK]
119
122
        return info
120
123
 
121
124
    def mtime(self, path):
133
136
            return os.path.getsize(full_path)
134
137
 
135
138
    def mkdir(self, path):
136
 
        """Create a directory.
137
 
 
138
 
        Not Implemented - see upload.txt.
139
 
        """
 
139
        """Create a directory."""
140
140
        path = self._sanitize(path)
 
141
        full_path = self._full(path)
 
142
        if os.path.exists(full_path):
 
143
            if os.path.isfile(full_path):
 
144
                raise OSError("File already exists:", path)
 
145
            elif os.path.isdir(full_path):
 
146
                raise OSError("Directory already exists:", path)
 
147
            raise OSError("OOPS, can't create:", path)
 
148
        else:
 
149
            # XXX check leaf !?!?!
 
150
            os.makedirs(full_path)
141
151
 
142
152
    def remove(self, path):
143
153
        """Remove a file."""
157
167
        Not Implemented - see upload.txt.
158
168
        """
159
169
        path = self._sanitize(path)
 
170
        full_path = self._full(path)
 
171
        if os.path.exists(full_path):
 
172
            shutil.rmtree(full_path)
 
173
        else:
 
174
            raise OSError("Not exists:", path)
160
175
 
161
176
    def rename(self, old, new):
162
177
        """Rename a file."""
187
202
        if os.path.exists(full_path):
188
203
            if os.path.isdir(full_path):
189
204
                raise OSError("Is a directory:", path)
 
205
        else:
 
206
            dirname = os.path.dirname(full_path)
 
207
            if dirname:
 
208
                if not os.path.exists(dirname):
 
209
                    os.makedirs(dirname)
 
210
 
190
211
        if start and start < 0:
191
212
            raise ValueError("Negative start argument:", start)
192
213
        if end and end < 0: