~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/codehosting/tests/test_sftp.py

  • Committer: Jeroen Vermeulen
  • Date: 2011-12-22 09:05:46 UTC
  • mto: This revision was merged to the branch mainline in revision 14583.
  • Revision ID: jeroen.vermeulen@canonical.com-20111222090546-6ctytqkw7zhqq1xy
Lint.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2009-2010 Canonical Ltd.  This software is licensed under the
 
1
# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
2
2
# GNU Affero General Public License version 3 (see the file LICENSE).
3
3
 
4
4
"""Tests for the transport-backed SFTP server implementation."""
34
34
    )
35
35
from lp.codehosting.sshserver.daemon import CodehostingAvatar
36
36
from lp.services.sshserver.sftp import FileIsADirectory
 
37
from lp.services.utils import file_exists
37
38
from lp.testing import TestCase
38
39
from lp.testing.factory import LaunchpadObjectFactory
39
40
 
53
54
        maybe_method = getattr(self._transport, name)
54
55
        if not callable(maybe_method):
55
56
            return maybe_method
 
57
 
56
58
        def defer_it(*args, **kwargs):
57
59
            return defer.maybeDeferred(maybe_method, *args, **kwargs)
 
60
 
58
61
        return mergeFunctionMetadata(maybe_method, defer_it)
59
62
 
60
63
 
183
186
        """Assert that calling functions fails with `sftp_code`."""
184
187
        deferred = defer.maybeDeferred(function, *args, **kwargs)
185
188
        deferred = assert_fails_with(deferred, filetransfer.SFTPError)
 
189
 
186
190
        def check_sftp_code(exception):
187
191
            self.assertEqual(sftp_code, exception.code)
188
192
            return exception
 
193
 
189
194
        return deferred.addCallback(check_sftp_code)
190
195
 
191
196
    def openFile(self, path, flags, attrs):
256
261
        filename = self.getPathSegment()
257
262
        deferred = self.openFile(
258
263
            filename, filetransfer.FXF_WRITE | filetransfer.FXF_TRUNC, {})
 
264
 
259
265
        def write_chunks(handle):
260
266
            deferred = handle.writeChunk(1, 'a')
261
267
            deferred.addCallback(lambda ignored: handle.writeChunk(2, 'a'))
262
268
            deferred.addCallback(lambda ignored: handle.close())
 
269
 
263
270
        deferred.addCallback(write_chunks)
264
271
        return deferred.addCallback(
265
272
            lambda ignored: self.assertFileEqual(chr(0) + 'aa', filename))
457
464
        filename = self.getPathSegment()
458
465
        self.build_tree_contents([(filename, 'bar')])
459
466
        deferred = self.sftp_server.removeFile(filename)
 
467
 
460
468
        def assertFileRemoved(ignored):
461
 
            self.failIfExists(filename)
 
469
            self.assertFalse(file_exists(filename))
 
470
 
462
471
        return deferred.addCallback(assertFileRemoved)
463
472
 
464
473
    def test_removeFileError(self):
470
479
    def test_removeFile_directory(self):
471
480
        # Errors in removeFile are translated into SFTPErrors.
472
481
        filename = self.getPathSegment()
473
 
        self.build_tree_contents([(filename+'/',)])
 
482
        self.build_tree_contents([(filename + '/',)])
474
483
        deferred = self.sftp_server.removeFile(filename)
475
484
        return assert_fails_with(deferred, filetransfer.SFTPError)
476
485
 
480
489
        new_filename = self.getPathSegment()
481
490
        self.build_tree_contents([(orig_filename, 'bar')])
482
491
        deferred = self.sftp_server.renameFile(orig_filename, new_filename)
 
492
 
483
493
        def assertFileRenamed(ignored):
484
 
            self.failIfExists(orig_filename)
485
 
            self.failUnlessExists(new_filename)
 
494
            self.assertFalse(file_exists(orig_filename))
 
495
            self.assertTrue(file_exists(new_filename))
 
496
 
486
497
        return deferred.addCallback(assertFileRenamed)
487
498
 
488
499
    def test_renameFileError(self):
497
508
        directory = self.getPathSegment()
498
509
        deferred = self.sftp_server.makeDirectory(
499
510
            directory, {'permissions': 0777})
 
511
 
500
512
        def assertDirectoryExists(ignored):
501
513
            self.assertTrue(
502
514
                os.path.isdir(directory), '%r is not a directory' % directory)
503
515
            self.assertEqual(040777, os.stat(directory).st_mode)
 
516
 
504
517
        return deferred.addCallback(assertDirectoryExists)
505
518
 
506
519
    def test_makeDirectoryError(self):
516
529
        directory = self.getPathSegment()
517
530
        os.mkdir(directory)
518
531
        deferred = self.sftp_server.removeDirectory(directory)
 
532
 
519
533
        def assertDirectoryRemoved(ignored):
520
 
            self.failIfExists(directory)
 
534
            self.assertFalse(file_exists(directory))
 
535
 
521
536
        return deferred.addCallback(assertDirectoryRemoved)
522
537
 
523
538
    def test_removeDirectoryError(self):
567
582
            '%s/%s/' % (parent_dir, child_dir),
568
583
            '%s/%s' % (parent_dir, child_file)])
569
584
        deferred = self.sftp_server.openDirectory(parent_dir)
 
585
 
570
586
        def check_entry(entries, filename):
571
587
            t = get_transport('.')
572
588
            stat = t.stat(urlutils.escape('%s/%s' % (parent_dir, filename)))
576
592
            name, longname, attrs = named_entries[0]
577
593
            self.assertEqual(lsLine(name, stat), longname)
578
594
            self.assertEqual(self.sftp_server._translate_stat(stat), attrs)
 
595
 
579
596
        def check_open_directory(directory):
580
597
            entries = list(directory)
581
598
            directory.close()
583
600
            self.assertEqual(set(names), set([child_dir, child_file]))
584
601
            check_entry(entries, child_dir)
585
602
            check_entry(entries, child_file)
 
603
 
586
604
        return deferred.addCallback(check_open_directory)
587
605
 
588
606
    def test_openDirectoryError(self):
597
615
        transport.put_bytes('hello', 'hello')
598
616
        sftp_server = TransportSFTPServer(AsyncTransport(transport))
599
617
        deferred = sftp_server.openDirectory('.')
 
618
 
600
619
        def check_directory(directory):
601
620
            with closing(directory):
602
621
                names = [entry[0] for entry in directory]
603
622
            self.assertEqual(['hello'], names)
 
623
 
604
624
        return deferred.addCallback(check_directory)
605
625
 
606
626
    def test__format_directory_entries_with_MemoryStat(self):