11 from _thread import start_new_thread
12 start_new_thread # pyflakes
15 from thread import start_new_thread
16 from gi.repository import GLib
18 from compathelper import _bytes
21 class TestMainLoop(unittest.TestCase):
22 def test_exception_handling(self):
23 pipe_r, pipe_w = os.pipe()
28 select.select([pipe_r], [], [])
32 def child_died(pid, status, loop):
34 raise Exception("deadbabe")
36 loop = GLib.MainLoop()
37 GLib.child_watch_add(GLib.PRIORITY_DEFAULT, pid, child_died, loop)
40 os.write(pipe_w, _bytes("Y"))
43 def excepthook(type, value, traceback):
44 self.assertTrue(type is Exception)
45 self.assertEqual(value.args[0], "deadbabe")
46 sys.excepthook = excepthook
54 sys.excepthook = sys.__excepthook__
57 # The exception should be handled (by printing it)
58 # immediately on return from child_died() rather
59 # than here. See bug #303573
61 self.assertFalse(got_exception)
63 def test_concurrency(self):
64 def on_usr1(signum, frame):
68 # create a thread which will terminate upon SIGUSR1 by way of
69 # interrupting sleep()
70 orig_handler = signal.signal(signal.SIGUSR1, on_usr1)
71 start_new_thread(time.sleep, (10,))
73 # now create two main loops
74 loop1 = GLib.MainLoop()
75 loop2 = GLib.MainLoop()
76 GLib.timeout_add(100, lambda: os.kill(os.getpid(), signal.SIGUSR1))
77 GLib.timeout_add(500, loop1.quit)
81 signal.signal(signal.SIGUSR1, orig_handler)
83 def test_sigint(self):
87 os.kill(os.getppid(), signal.SIGINT)
90 loop = GLib.MainLoop()
93 self.fail('expected KeyboardInterrupt exception')
94 except KeyboardInterrupt:
96 self.assertFalse(loop.is_running())