~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to bzrplugins/lpserve/test_lpserve.py

  • Committer: John Arbash Meinel
  • Date: 2010-08-17 20:11:04 UTC
  • mto: This revision was merged to the branch mainline in revision 11773.
  • Revision ID: jameinel@falco-lucid-20100817201104-47d2vrhfk1u7ec31
Start building the testing infrastructure.

Move lpserve into a plugin module rather than a simple script. The main
benefit is that I can now invoke the test suite directly, rather than
having to use the launchpad test runner, which is much heavier weight.
(It takes >10s just to load the tests to filter it down to the test I
want to run, I can run all tests under bzr selftest is 1.4s)

Create a subclass for testing LPServe in the test suite, that makes
it a bit easier to make sure that the class is started and stopped
appropriately.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
import threading
 
3
 
 
4
from bzrlib import tests, trace
 
5
from bzrlib.plugins import lpserve
 
6
 
 
7
 
 
8
class TestingLPServiceInAThread(lpserve.LPService):
 
9
    """Wrap starting and stopping an LPService instance in a thread."""
 
10
 
 
11
    # For testing, we set the timeouts much lower, because we want the tests to
 
12
    # run quickly
 
13
    WAIT_FOR_CHILDREN_TIMEOUT = 0.5
 
14
    SOCKET_TIMEOUT = 0.01
 
15
    SLEEP_FOR_CHILDREN_TIMEOUT = 0.01
 
16
 
 
17
    def __init__(self, host='127.0.0.1', port=0):
 
18
        self.service_started = threading.Event()
 
19
        self.service_stopped = threading.Event()
 
20
        self.this_thread = None
 
21
        super(TestingLPServiceInAThread, self).__init__(host=host, port=port)
 
22
 
 
23
    def _create_master_socket(self):
 
24
        trace.mutter('creating master socket')
 
25
        super(TestingLPServiceInAThread, self)._create_master_socket()
 
26
        trace.mutter('setting service_started')
 
27
        self.service_started.set()
 
28
 
 
29
    def main_loop(self):
 
30
        self.service_stopped.clear()
 
31
        super(TestingLPServiceInAThread, self).main_loop()
 
32
        self.service_stopped.set()
 
33
 
 
34
    @staticmethod
 
35
    def start(test):
 
36
        """Start a new LPService in a thread on a random port.
 
37
 
 
38
        This will block until the service has created its socket, and is ready
 
39
        to communicate.
 
40
 
 
41
        :return: A new TestingLPServiceInAThread instance
 
42
        """
 
43
        # Allocate a new port on only the loopback device
 
44
        new_service = TestingLPServiceInAThread()
 
45
        thread = threading.Thread(target=new_service.main_loop,
 
46
                                  name='TestingLPServiceInAThread')
 
47
        new_service.this_thread = thread
 
48
        thread.start()
 
49
        new_service.service_started.wait(10.0)
 
50
        if not new_service.service_started.isSet():
 
51
            raise RuntimeError(
 
52
                'Failed to start the TestingLPServiceInAThread')
 
53
        test.addCleanup(new_service.stop)
 
54
        # what about returning new_service._sockname ?
 
55
        return new_service
 
56
 
 
57
    def stop(self):
 
58
        """Stop the test-server thread. This can be called multiple times."""
 
59
        if self.this_thread is None:
 
60
            # We already stopped the process
 
61
            return
 
62
        self._should_terminate.set()
 
63
        self.service_stopped.wait(10.0)
 
64
        if not self.service_stopped.isSet():
 
65
            raise RuntimeError(
 
66
                'Failed to stop the TestingLPServiceInAThread')
 
67
        self.this_thread.join()
 
68
        # Break any refcycles
 
69
        self.this_thread = None
 
70
 
 
71
 
 
72
class TestTestingLPServiceInAThread(tests.TestCaseWithTransport):
 
73
 
 
74
    def test_start_and_stop_service(self):
 
75
        service = TestingLPServiceInAThread.start(self)
 
76
        service.stop()
 
77
 
 
78
    def test_multiple_stops(self):
 
79
        service = TestingLPServiceInAThread.start(self)
 
80
        service.stop()
 
81
        service.stop()
 
82
 
 
83
    def test_autostop(self):
 
84
        # We shouldn't leak a thread here
 
85
        service = TestingLPServiceInAThread.start(self)