Soyuz Upload Test ----------------- This test will: * Turn the poppy FTP server on * Upload packages * Check result * Kill the FTP server * Import gpg key for katie * Register gpg key for katie * Register the katie user in the right team * Turn on the test keyserver * Include the non_free component in the database * Run process-upload.py * Check result * Mark packages as ACCEPTED * Runs process-accepted.py * Check results * Cleanup Uploading Packages ------------------ First, let's create a temporary directory where we'll put uploaded files in. >>> import os >>> import tempfile >>> temp_dir = tempfile.mkdtemp() >>> incoming_dir = os.path.join(temp_dir, "incoming") >>> accepted_dir = os.path.join(temp_dir, "accepted") >>> rejected_dir = os.path.join(temp_dir, "rejected") >>> failed_dir = os.path.join(temp_dir, "failed") >>> os.mkdir(incoming_dir) Now, let's create a subprocess running the poppy FTP server. It won't call the upload processing tool. We'll do that ourselves in our test, so that we can control what's going on. >>> from lp.poppy.tests.helpers import PoppyTestSetup, SoyuzUploadError >>> poppy = PoppyTestSetup(incoming_dir) >>> poppy.startPoppy() Connect to the server and login. We'll keep trying to connect until the server dies or the connection succeeds. >>> import ftplib, socket >>> ftp = ftplib.FTP() >>> while True: ... try: ... reply = ftp.connect("localhost", 3421) ... except socket.error: ... if not poppy.alive: ... raise SoyuzUploadError('Server can not start.') ... else: ... break >>> ftp.login("anonymous", "") '230 Login Successful.' >>> ftp.cwd("/") '250 CWD command successful.' Good.. let's send all packages we have in the test directory to the poppy server. We send each package set on a different ftp session. >>> from canonical.config import config >>> from lp.archiveuploader.tagfiles import parse_tagfile >>> import glob >>> test_files_dir = os.path.join(config.root, ... "lib/lp/soyuz/scripts/" ... "tests/upload_test_files/") ... >>> changes = sorted(glob.glob(test_files_dir + "*.changes")) >>> sent_filenames = [] >>> uploads = [] >>> package_names = [] >>> # XXX cprov 2006-01-25 bug=29645: poppy still having a weird behaviour # during the file transfer, it suddenly closes the connection due # inactivity. That's why we keep polling the 'ftp.sock' attribute and # reconnect if it is gone. >>> for changes_filepath in changes: ... ... if not ftp.sock: ... assert ftp.connect("localhost", 3421).startswith("220 ") ... assert ftp.login("anonymous", "") == '230 Login Successful.' ... ... tf = parse_tagfile(changes_filepath) ... ... if tf.has_key("Source"): ... package_names.append(tf["Source"]) ... ... send_filepaths = [changes_filepath] ... if tf.has_key("Files"): ... send_filepaths.extend( ... [os.path.join(test_files_dir, line.split()[-1]) ... for line in tf["Files"].splitlines() if line]) ... ... sent_filenames.extend( ... os.path.basename(filepath) for filepath in send_filepaths) ... ... ignore = ftp.cwd("ubuntutest") ... ... for filepath in send_filepaths: ... reply = ftp.storbinary( ... "STOR %s" % os.path.basename(filepath), open(filepath)) ... assert reply == '226 Transfer successful.' ... ... uploads.append(send_filepaths) ... ... assert ftp.quit() == '221 Goodbye.' Check that what we've just uploaded (everything in test_files_dir) is what we were expecting to have uploaded. >>> package_names ['drdsl', 'etherwake'] We create a set of the filenames we expect to see in the FTP server process output, and wait until all of them have been shown. This is a little bit tricky because we won't simply try to read the process output in a blocking way, since any failure in the FTP process would block automated tests. Instead, we define a timeout between output data. If the process doesn't provide new data in the given number seconds, we report a failure. >>> poppy.verify_output( ... ['ubuntutest', 'ubuntutest', 'ubuntutest']) At that point we must have a bunch of directories in the upload base directory named -XXXXXX, each one as a result of each FTP session. Below we ensure that, and also that the content of these files match the uploaded ones. >>> import hashlib >>> def get_md5(filename): ... return hashlib.md5(open(filename).read()).digest() >>> def get_upload_dir(num, dir=incoming_dir): ... """Return the path to the upload, if found in the dir.""" ... for upload_dir in os.listdir(dir): ... if upload_dir.endswith("%06d" % num): ... return os.path.join(dir, upload_dir) ... return None >>> def find_upload_dir(num): ... """Return a tuple (result, path) for the numbered upload.""" ... for name, dir in (("incoming", incoming_dir), ... ("accepted", accepted_dir), ("rejected", rejected_dir), ... ("failed", failed_dir)): ... result = get_upload_dir(num, dir) ... if result is not None: ... return (name, result) ... return (None, None) >>> def find_upload_dir_result(num): ... """Return the result for the numbered upload.""" ... return find_upload_dir(num)[0] >>> def find_upload_dir_path(num): ... """Return the path of the numbered upload.""" ... return find_upload_dir(num)[1] >>> for i, sent_filenames in enumerate(uploads): ... upload_dir = get_upload_dir(i + 1) ... distro_upload_dir = os.path.join(upload_dir, 'ubuntutest') ... assert len(os.listdir(distro_upload_dir)) == len(sent_filenames) ... for filename in sent_filenames: ... upload_filename = os.path.join(distro_upload_dir, ... os.path.basename(filename)) ... assert os.path.isfile(upload_filename) ... assert get_md5(filename) == get_md5(upload_filename) Right, that's all we need from the FTP server. We don't need it anymore, so we'll just kill the process. >>> status = poppy.killPoppy() Finally, we'll just create an entirely empty upload folder. We rely for our tests on a poppy-like naming system, ie. that the upload folder end with 000004 (being our fourth upload). >>> os.mkdir("%s/fake_upload_000004" % incoming_dir) Processing Uploads ------------------ Before asking the system to process the upload, we must prepare the database to receive it. This consists mainly of adding the katie user, since that's the email used in the Changed-By field for the .changes files we are going to process, and the ftpmaster@canonical.com GPG key, since that's the one used to sign the .changes file. We don't have to check the .dsc file, since we're using the 'sync' policy in process-upload.py. # XXX: gustavo 2005-12-10 # It might be interesting to move these entries into the sample data # rather than leaving it here. On the other hand, it's nice to have # it here as we have a good reference of what the uploading # procedure depends upon. So, load the GPG key: >>> from zope.component import getUtility >>> from canonical.launchpad.ftests.keys_for_tests import gpgkeysdir >>> from canonical.launchpad.interfaces.gpghandler import IGPGHandler >>> gpg_handler = getUtility(IGPGHandler) >>> key_path = os.path.join(gpgkeysdir, 'ftpmaster@canonical.com.pub') >>> key_data = open(key_path).read() >>> key = gpg_handler.importPublicKey(key_data) >>> assert key is not None >>> print key.fingerprint 33C0A61893A5DC5EB325B29E415A12CAC2F30234 Create the katie user and register it in a team that is allowed to do uploads: >>> from canonical.launchpad.interfaces.emailaddress import IEmailAddressSet >>> from lp.registry.interfaces.gpg import ( ... GPGKeyAlgorithm, ... IGPGKeySet, ... ) >>> from lp.registry.interfaces.person import ( ... IPersonSet, ... PersonCreationRationale, ... ) >>> name, address = "Katie", "katie@rockhopper.ubuntu.com" >>> user = getUtility(IPersonSet).ensurePerson( ... address, name, PersonCreationRationale.OWNER_CREATED_LAUNCHPAD) >>> assert user is not None >>> email = getUtility(IEmailAddressSet).getByEmail(address) >>> user.validateAndEnsurePreferredEmail(email) >>> uploader_team = getUtility(IPersonSet).getByName("ubuntu-team") >>> assert uploader_team is not None >>> login("foo.bar@canonical.com") >>> unused = uploader_team.addMember(user, reviewer=uploader_team.teamowner) >>> login("test@canonical.com") Assign the loaded GPG key to the katie user. >>> key_set = getUtility(IGPGKeySet) >>> user_key = key_set.new(ownerID=user.id, keyid=key.keyid, ... fingerprint=key.fingerprint, ... algorithm=GPGKeyAlgorithm.items[key.algorithm], ... keysize=key.keysize, can_encrypt=key.can_encrypt, ... active=True) Now we want to turn on the test key server to provide the key we just imported. Remember that process-upload.py is running as a different process. >>> from lp.testing.keyserver import KeyServerTac >>> keyserver = KeyServerTac() >>> keyserver.setUp() Include non-free in the database. This will be done by the NascentUpload in the 'sync' policy in the future. >>> from lp.soyuz.interfaces.component import IComponentSet >>> component_set = getUtility(IComponentSet) >>> non_free = component_set.new("non-free") >>> contrib = component_set.new("contrib") >>> import transaction >>> transaction.commit() Now we are ready to process the uploaded packages. This is done by running process-upload.py on each upload directory. >>> import subprocess, sys >>> script = os.path.join(config.root, "scripts/process-upload.py") First, we will test process-upload's -J option, which limits which uploads should be processed. We'll do this by locating and uploading initially just upload number 1. >>> upload_dir_1_path = get_upload_dir(1) >>> upload_dir_1_name = os.path.basename(upload_dir_1_path) >>> process = subprocess.Popen([sys.executable, script, "--no-mails", "-vv", ... "-C", "sync", "-J", upload_dir_1_name, ... temp_dir], ... stdout=subprocess.PIPE, ... stderr=subprocess.PIPE) >>> stdout, stderr = process.communicate() >>> process.returncode 0 Check the four uploads are all where we expect - number 1 in failed, the other three still in incoming. >>> for i in range(4): ... find_upload_dir_result(i + 1) 'failed' 'incoming' 'incoming' 'incoming' Now continue with the real upload. >>> process = subprocess.Popen([sys.executable, script, "--no-mails", "-vv", ... "-C", "sync", temp_dir], ... stdout=subprocess.PIPE, ... stderr=subprocess.PIPE) >>> stdout, stderr = process.communicate() >>> if process.returncode != 0: ... print stdout ... print stderr Let's check if packages were uploaded correctly. >>> from lp.registry.model.sourcepackagename import SourcePackageName >>> from lp.soyuz.model.sourcepackagerelease import SourcePackageRelease >>> from pprint import pprint >>> spn = SourcePackageName.selectOneBy(name="drdsl") >>> spn.name u'drdsl' >>> spr = SourcePackageRelease.selectOneBy(sourcepackagenameID=spn.id) >>> spr.title u'drdsl - 1.2.0-0ubuntu1' >>> spr.name u'drdsl' >>> spr.version u'1.2.0-0ubuntu1' >>> spr.component.name u'multiverse' >>> spr.section.name u'comm' >>> spr.maintainer.displayname u'Matthias Klose' >>> pprint(sorted([sprf.libraryfile.filename for sprf in spr.files])) [u'drdsl_1.2.0-0ubuntu1.diff.gz', u'drdsl_1.2.0-0ubuntu1.dsc', u'drdsl_1.2.0.orig.tar.gz'] >>> spr.format.name 'DPKG' >>> spr.urgency.name 'LOW' >>> spr.upload_distroseries.name u'breezy-autotest' Same thing for etherwake: >>> spn = SourcePackageName.selectOneBy(name="etherwake") >>> spn.name u'etherwake' >>> spr = SourcePackageRelease.selectOneBy(sourcepackagenameID=spn.id) >>> spr.title u'etherwake - 1.08-1' >>> spr.name u'etherwake' >>> spr.version u'1.08-1' >>> spr.component.name u'universe' >>> spr.section.name u'net' >>> spr.maintainer.displayname u'Alain Schroeder' >>> pprint(sorted([sprf.libraryfile.filename for sprf in spr.files])) [u'etherwake_1.08-1.diff.gz', u'etherwake_1.08-1.dsc', u'etherwake_1.08.orig.tar.gz'] >>> spr.format.name 'DPKG' >>> spr.urgency.name 'LOW' >>> spr.upload_distroseries.name u'breezy-autotest' Check the four uploads all ended up where we expected. >>> for i in range(0, 4): ... find_upload_dir_result(i + 1) 'failed' 'failed' Also check the upload folders contain all the files we uploaded. # XXX cprov 2006-12-06: hardcoded 'ubuntutest' directory is a hack see # above around line 313. >>> for i, sent_filenames in enumerate(uploads): ... upload_dir = find_upload_dir_path(i + 1) ... if upload_dir is None: ... continue ... distro_upload_dir = os.path.join(upload_dir, 'ubuntutest') ... assert len(os.listdir(distro_upload_dir)) == len(sent_filenames) ... for filename in sent_filenames: ... upload_filename = os.path.join(distro_upload_dir, ... os.path.basename(filename)) ... assert os.path.isfile(upload_filename) ... assert get_md5(filename) == get_md5(upload_filename) Now let's see if all of the valid uploads are in the Upload queue marked as NEW and RELEASE. >>> from lp.soyuz.model.queue import PackageUploadSource >>> for name in package_names: ... print name ... spn = SourcePackageName.selectOneBy(name=name) ... spr = SourcePackageRelease.selectOneBy(sourcepackagenameID=spn.id) ... us = PackageUploadSource.selectOneBy(sourcepackagereleaseID=spr.id) ... assert us.packageupload.status.name == 'NEW' ... assert us.packageupload.pocket.name == 'RELEASE' drdsl etherwake Processing NEW Items ---------------------- The processing of NEW-queue-entries checks the integrity of uploads candidates and promote them to ACCEPTED, the failures are kept as NEW >>> from lp.registry.interfaces.distribution import IDistributionSet >>> from lp.soyuz.enums import PackageUploadStatus >>> from lp.soyuz.interfaces.queue import ( ... QueueInconsistentStateError) Since we landed correct security adapters for Upload, we need to perform further actions logged in as an admins, which have launchpad.Edit on the records: >>> from canonical.launchpad.ftests import login >>> login("foo.bar@canonical.com") >>> distro = getUtility(IDistributionSet).getByName('ubuntutest') >>> series = distro['breezy-autotest'] Let's test IHasQueueItems.getQueueItems: >>> new_items = series.getQueueItems(PackageUploadStatus.NEW) >>> new_items.count() 2 Querying by status and a name term: >>> items = series.getQueueItems(PackageUploadStatus.NEW, ... name='dr') >>> items.count() 1 >>> items[0].sources[0].sourcepackagerelease.name u'drdsl' >>> items[0].sources[0].sourcepackagerelease.version u'1.2.0-0ubuntu1' Querying by status, name and version terms: >>> items = series.getQueueItems(PackageUploadStatus.NEW, ... name='dr', version='1.2') >>> items.count() 1 >>> items = series.getQueueItems(PackageUploadStatus.NEW, ... name='dr', version='1.5') >>> items.count() 0 Using exact_match argument: As you can see exact_match arguments affects both, name & version: # XXX cprov 2006-01-25 bug=29642: # Andrew suggest we can split the exact_match # attribute in two, as exact_name & exact_version, which might be # a good idea, since it produce a more controllable behaviour. >>> items = series.getQueueItems(PackageUploadStatus.NEW, ... name='dr', version='1.2', exact_match=True) >>> items.count() 0 >>> items = series.getQueueItems(PackageUploadStatus.NEW, ... name='drdsl', version='1.2.0-0ubuntu1', exact_match=True) >>> items.count() 1 Using getQueueItem to inspect current NEW queue and accept them. >>> queue_items = series.getQueueItems(PackageUploadStatus.NEW) >>> L = [] >>> for queue_item in queue_items: ... try: ... queue_item.setAccepted() ... except QueueInconsistentStateError, e: ... L.append("%s %s" % (queue_item.sourcepackagerelease.name, e)) ... else: ... L.append("%s %s" % (queue_item.sourcepackagerelease.name, ... 'ACCEPTED')) >>> L.sort() >>> print "\n".join(L) drdsl ACCEPTED etherwake ACCEPTED # XXX cprov 2006-04-12 bug=3989: # We must flush these changes so that it gets out of # the cache and into the database. Without this process-accepted.py # wouldn't see the changes. Reported in >>> from canonical.database.sqlbase import flush_database_updates >>> flush_database_updates() >>> transaction.commit() Now we process the accepted queue items, one more time. >>> script = os.path.join(config.root, "scripts", "process-accepted.py") >>> process = subprocess.Popen([sys.executable, script, "ubuntutest", "-q"]) >>> process.wait() 0 These packages must now be in the publishing history. Let's check it. >>> from lp.soyuz.model.publishing import ( ... SourcePackagePublishingHistory as SSPPH) >>> package_names.sort() >>> for name in package_names: ... spn = SourcePackageName.selectOneBy(name=name) ... spr = SourcePackageRelease.selectOneBy(sourcepackagenameID=spn.id) ... sspph = SSPPH.selectOneBy(sourcepackagereleaseID=spr.id) ... if sspph: ... print name, sspph.status.title ... else: ... print name, 'not Published' drdsl Pending etherwake Pending Invoke Publisher script against the 'ubuntutest' distribution: >>> script = os.path.join(config.root, "scripts", "publish-distro.py") >>> process = subprocess.Popen([sys.executable, script, "-vvCq", ... "-d", "ubuntutest"], ... stdout=subprocess.PIPE, ... stderr=subprocess.PIPE) >>> stdout, stderr = process.communicate() >>> print stdout >>> print stderr DEBUG Initialising zopeless. DEBUG Distribution: ubuntutest ... DEBUG Added /var/tmp/archive/ubuntutest/pool/universe/e/etherwake/etherwake_1.08.orig.tar.gz from library DEBUG Added /var/tmp/archive/ubuntutest/pool/universe/e/etherwake/etherwake_1.08-1.diff.gz from library DEBUG Added /var/tmp/archive/ubuntutest/pool/universe/e/etherwake/etherwake_1.08-1.dsc from library ... # XXX cprov 2006-04-12 bug=3989 >>> flush_database_updates() >>> transaction.commit() >>> from canonical.database.sqlbase import clear_current_connection_cache >>> clear_current_connection_cache() Check if the 'etherwake' source package was correctly published and is in the filesystem archive, we are looking for the DSC, the gzipped original source and the gzipped package diff: >>> len(os.listdir("/var/tmp/archive/ubuntutest/pool/universe/e/etherwake")) 3 Check the generation of a correct Sources tag file for the main component of ubuntutest/breezy-autotest, containing the only the required entry for 'etherwake': >>> sources = open( ... "/var/tmp/archive/ubuntutest/dists/breezy-autotest/universe/source" ... "/Sources").read() >>> import re >>> sources = re.subn(r'(?sm)^Checksums-.*?(?=^[^ ])', '', sources)[0] >>> print sources + '\nEND' Package: etherwake Binary: etherwake Version: 1.08-1 Section: universe/net Maintainer: Alain Schroeder Build-Depends: debhelper (>> 2.0) Architecture: any Standards-Version: 3.5.10.0 Format: 1.0 Directory: pool/universe/e/etherwake Files: f13711c5b8261fbb77b43ae0e8ba9360 566 etherwake_1.08-1.dsc c2dc10f98bac012b900fd0b46721fc80 4455 etherwake_1.08.orig.tar.gz 95c1e89e3ad7bc8740793bdf7aeb7334 4145 etherwake_1.08-1.diff.gz END # XXX: maxb 2010-04-15 bug=563503 # The regex munging above can be removed once the tests no longer need to pass # on Karmic and earlier. Now we invoke changeOverride on just published etherwake, moving it to component 'multiverse'. >>> ubuntutest = getUtility(IDistributionSet)['ubuntutest'] >>> breezy_autotest = ubuntutest['breezy-autotest'] >>> etherwake = breezy_autotest.getSourcePackage('etherwake') >>> etherwake_drspr = etherwake.currentrelease >>> override = etherwake_drspr.current_published.changeOverride( ... new_component=getUtility(IComponentSet)['multiverse']) Check if we have new pending publishing record as expected >>> for pub in SSPPH.selectBy( ... sourcepackagereleaseID=etherwake_drspr.sourcepackagerelease.id, ... orderBy=['id']): ... print pub.status.name, pub.component.name, pub.pocket.name PUBLISHED universe RELEASE PENDING multiverse RELEASE Force database changes, so they can be used by the external script properly. # XXX cprov 2006-04-12 bug=3989: >>> flush_database_updates() >>> transaction.commit() Invoke Publisher script again to land our changes in the archive >>> script = os.path.join(config.root, "scripts", "publish-distro.py") >>> process = subprocess.Popen([sys.executable, script, "-vvCq", ... "-d", "ubuntutest"], ... stdout=subprocess.PIPE, ... stderr=subprocess.PIPE) >>> stdout, stderr = process.communicate() >>> process.returncode 0 Check careful publishing took place, as requested with -C. In careful publishing mode, publish-distro will attempt to publish files which are already marked as published in the database and, if the files are already on disk, verify the contents are as expected. >>> print stderr DEBUG Initialising zopeless. DEBUG Distribution: ubuntutest ... DEBUG /var/tmp/archive/ubuntutest/pool/universe/e/etherwake/etherwake_1.08.orig.tar.gz is already in pool with the same content. DEBUG /var/tmp/archive/ubuntutest/pool/universe/e/etherwake/etherwake_1.08-1.diff.gz is already in pool with the same content. DEBUG /var/tmp/archive/ubuntutest/pool/universe/e/etherwake/etherwake_1.08-1.dsc is already in pool with the same content. ... Invalidates SQLObject cache to cope with publisher. >>> clear_current_connection_cache() Check the publishing history again >>> for pub in SSPPH.selectBy( ... sourcepackagereleaseID=etherwake_drspr.sourcepackagerelease.id, ... orderBy=['id']): ... print pub.status.name, pub.component.name, pub.pocket.name SUPERSEDED universe RELEASE PUBLISHED multiverse RELEASE Check if the package was moved properly to the component 'multiverse': >>> main_sources = open("/var/tmp/archive/ubuntutest/dists/breezy-autotest" ... "/main/source/Sources").read() >>> print main_sources + '\nEND' END >>> multiverse_sources = open( ... "/var/tmp/archive/ubuntutest/dists/breezy-autotest" ... "/multiverse/source/Sources").read() >>> print multiverse_sources + '\nEND' Package: drdsl ... Package: etherwake ... END == Release File == The publish-distro.py script will write an appropriate Release file containing the suite in question and a list of checksums (MD5, SHA1 and SHA256) for each index published. # XXX cprov 2006-12-13: trailing space on Architectures is a side-effect # caused by the absence of published binaries in this suite. It should # no happen in real conditions. >>> releasefile_contents = open("/var/tmp/archive/ubuntutest/dists/" ... "breezy-autotest/Release").read() >>> print releasefile_contents + '\nEND' #doctest: -NORMALIZE_WHITESPACE Origin: ubuntutest Label: ubuntutest Suite: breezy-autotest Version: 6.6.6 Codename: breezy-autotest Date: ... Architectures: Components: main restricted universe multiverse Description: ubuntutest Breezy Badger Autotest 6.6.6 MD5Sum: a5e5742a193740f17705c998206e18b6 114 main/source/Release ... SHA1: 6222b7e616bcc20a32ec227254ad9de8d4bd5557 114 main/source/Release ... SHA256: 297125e9b0f5da85552691597c9c4920aafd187e18a4e01d2ba70d8d106a6338 114 main/source/Release ... END Testing archive-cruft-check-ng behaviour: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Defining path to the script: >>> script = os.path.join(config.root, "scripts", "ftpmaster-tools", ... "archive-cruft-check.py") >>> process = subprocess.Popen([sys.executable, script, "-vn", ... "-d", "ubuntutest", ... "-s", "breezy-autotest", ... "/var/tmp/archive"], ... stdout=subprocess.PIPE, ... stderr=subprocess.PIPE,) >>> stdout, stderr = process.communicate() >>> process.returncode 0 >>> print stderr DEBUG Acquiring lock DEBUG Initialising connection. DEBUG Considering Sources: DEBUG Processing /var/tmp/archive/ubuntutest/dists/breezy-autotest/restricted/source/Sources.gz DEBUG Processing /var/tmp/archive/ubuntutest/dists/breezy-autotest/main/source/Sources.gz DEBUG Processing /var/tmp/archive/ubuntutest/dists/breezy-autotest/multiverse/source/Sources.gz DEBUG Processing /var/tmp/archive/ubuntutest/dists/breezy-autotest/universe/source/Sources.gz DEBUG Building not build from source list (NBS): DEBUG Building all superseded by any list (ASBA): DEBUG No NBS found Nice! That's enough for now.. let's kill the process and clean everything up. >>> import shutil >>> shutil.rmtree(temp_dir) Remove the test archive from filesystem. >>> shutil.rmtree("/var/tmp/archive/") >>> keyserver.tearDown() Feito! ;-) vim:ft=doctest:ts=4:sw=4:et