4
from bzrlib import tests, trace
5
from bzrlib.plugins import lpserve
8
class TestingLPServiceInAThread(lpserve.LPService):
9
"""Wrap starting and stopping an LPService instance in a thread."""
11
# For testing, we set the timeouts much lower, because we want the tests to
13
WAIT_FOR_CHILDREN_TIMEOUT = 0.5
15
SLEEP_FOR_CHILDREN_TIMEOUT = 0.01
17
def __init__(self, host='127.0.0.1', port=0):
18
self.service_started = threading.Event()
19
self.service_stopped = threading.Event()
20
self.this_thread = None
21
super(TestingLPServiceInAThread, self).__init__(host=host, port=port)
23
def _create_master_socket(self):
24
trace.mutter('creating master socket')
25
super(TestingLPServiceInAThread, self)._create_master_socket()
26
trace.mutter('setting service_started')
27
self.service_started.set()
30
self.service_stopped.clear()
31
super(TestingLPServiceInAThread, self).main_loop()
32
self.service_stopped.set()
36
"""Start a new LPService in a thread on a random port.
38
This will block until the service has created its socket, and is ready
41
:return: A new TestingLPServiceInAThread instance
43
# Allocate a new port on only the loopback device
44
new_service = TestingLPServiceInAThread()
45
thread = threading.Thread(target=new_service.main_loop,
46
name='TestingLPServiceInAThread')
47
new_service.this_thread = thread
49
new_service.service_started.wait(10.0)
50
if not new_service.service_started.isSet():
52
'Failed to start the TestingLPServiceInAThread')
53
test.addCleanup(new_service.stop)
54
# what about returning new_service._sockname ?
58
"""Stop the test-server thread. This can be called multiple times."""
59
if self.this_thread is None:
60
# We already stopped the process
62
self._should_terminate.set()
63
self.service_stopped.wait(10.0)
64
if not self.service_stopped.isSet():
66
'Failed to stop the TestingLPServiceInAThread')
67
self.this_thread.join()
69
self.this_thread = None
72
class TestTestingLPServiceInAThread(tests.TestCaseWithTransport):
74
def test_start_and_stop_service(self):
75
service = TestingLPServiceInAThread.start(self)
78
def test_multiple_stops(self):
79
service = TestingLPServiceInAThread.start(self)
83
def test_autostop(self):
84
# We shouldn't leak a thread here
85
service = TestingLPServiceInAThread.start(self)