Simon Glass | 7057d02 | 2018-10-01 21:12:47 -0600 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | # SPDX-License-Identifier: GPL-2.0+ |
| 3 | # |
| 4 | # Modified by: Corey Goldberg, 2013 |
| 5 | # |
| 6 | # Original code from: |
| 7 | # Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013) |
| 8 | # Copyright (C) 2005-2011 Canonical Ltd |
| 9 | |
| 10 | """Python testtools extension for running unittest suites concurrently. |
| 11 | |
| 12 | The `testtools` project provides a ConcurrentTestSuite class, but does |
| 13 | not provide a `make_tests` implementation needed to use it. |
| 14 | |
| 15 | This allows you to parallelize a test run across a configurable number |
| 16 | of worker processes. While this can speed up CPU-bound test runs, it is |
| 17 | mainly useful for IO-bound tests that spend most of their time waiting for |
| 18 | data to arrive from someplace else and can benefit from cocncurrency. |
| 19 | |
| 20 | Unix only. |
| 21 | """ |
| 22 | |
| 23 | import os |
| 24 | import sys |
| 25 | import traceback |
| 26 | import unittest |
| 27 | from itertools import cycle |
| 28 | from multiprocessing import cpu_count |
| 29 | |
| 30 | from subunit import ProtocolTestCase, TestProtocolClient |
| 31 | from subunit.test_results import AutoTimingTestResultDecorator |
| 32 | |
| 33 | from testtools import ConcurrentTestSuite, iterate_tests |
| 34 | |
| 35 | |
| 36 | _all__ = [ |
| 37 | 'ConcurrentTestSuite', |
| 38 | 'fork_for_tests', |
| 39 | 'partition_tests', |
| 40 | ] |
| 41 | |
| 42 | |
| 43 | CPU_COUNT = cpu_count() |
| 44 | |
| 45 | |
| 46 | def fork_for_tests(concurrency_num=CPU_COUNT): |
| 47 | """Implementation of `make_tests` used to construct `ConcurrentTestSuite`. |
| 48 | |
| 49 | :param concurrency_num: number of processes to use. |
| 50 | """ |
| 51 | def do_fork(suite): |
| 52 | """Take suite and start up multiple runners by forking (Unix only). |
| 53 | |
| 54 | :param suite: TestSuite object. |
| 55 | |
| 56 | :return: An iterable of TestCase-like objects which can each have |
| 57 | run(result) called on them to feed tests to result. |
| 58 | """ |
| 59 | result = [] |
| 60 | test_blocks = partition_tests(suite, concurrency_num) |
| 61 | # Clear the tests from the original suite so it doesn't keep them alive |
| 62 | suite._tests[:] = [] |
| 63 | for process_tests in test_blocks: |
| 64 | process_suite = unittest.TestSuite(process_tests) |
| 65 | # Also clear each split list so new suite has only reference |
| 66 | process_tests[:] = [] |
| 67 | c2pread, c2pwrite = os.pipe() |
| 68 | pid = os.fork() |
| 69 | if pid == 0: |
| 70 | try: |
Simon Glass | e0451a6 | 2020-12-28 20:34:58 -0700 | [diff] [blame^] | 71 | stream = os.fdopen(c2pwrite, 'wb') |
Simon Glass | 7057d02 | 2018-10-01 21:12:47 -0600 | [diff] [blame] | 72 | os.close(c2pread) |
| 73 | # Leave stderr and stdout open so we can see test noise |
| 74 | # Close stdin so that the child goes away if it decides to |
| 75 | # read from stdin (otherwise its a roulette to see what |
| 76 | # child actually gets keystrokes for pdb etc). |
| 77 | sys.stdin.close() |
| 78 | subunit_result = AutoTimingTestResultDecorator( |
| 79 | TestProtocolClient(stream) |
| 80 | ) |
| 81 | process_suite.run(subunit_result) |
| 82 | except: |
| 83 | # Try and report traceback on stream, but exit with error |
| 84 | # even if stream couldn't be created or something else |
| 85 | # goes wrong. The traceback is formatted to a string and |
| 86 | # written in one go to avoid interleaving lines from |
| 87 | # multiple failing children. |
| 88 | try: |
| 89 | stream.write(traceback.format_exc()) |
| 90 | finally: |
| 91 | os._exit(1) |
| 92 | os._exit(0) |
| 93 | else: |
| 94 | os.close(c2pwrite) |
Simon Glass | e0451a6 | 2020-12-28 20:34:58 -0700 | [diff] [blame^] | 95 | stream = os.fdopen(c2pread, 'rb') |
Simon Glass | 7057d02 | 2018-10-01 21:12:47 -0600 | [diff] [blame] | 96 | test = ProtocolTestCase(stream) |
| 97 | result.append(test) |
| 98 | return result |
| 99 | return do_fork |
| 100 | |
| 101 | |
| 102 | def partition_tests(suite, count): |
| 103 | """Partition suite into count lists of tests.""" |
| 104 | # This just assigns tests in a round-robin fashion. On one hand this |
| 105 | # splits up blocks of related tests that might run faster if they shared |
| 106 | # resources, but on the other it avoids assigning blocks of slow tests to |
| 107 | # just one partition. So the slowest partition shouldn't be much slower |
| 108 | # than the fastest. |
| 109 | partitions = [list() for _ in range(count)] |
| 110 | tests = iterate_tests(suite) |
| 111 | for partition, test in zip(cycle(partitions), tests): |
| 112 | partition.append(test) |
| 113 | return partitions |
| 114 | |
| 115 | |
| 116 | if __name__ == '__main__': |
| 117 | import time |
| 118 | |
| 119 | class SampleTestCase(unittest.TestCase): |
| 120 | """Dummy tests that sleep for demo.""" |
| 121 | |
| 122 | def test_me_1(self): |
| 123 | time.sleep(0.5) |
| 124 | |
| 125 | def test_me_2(self): |
| 126 | time.sleep(0.5) |
| 127 | |
| 128 | def test_me_3(self): |
| 129 | time.sleep(0.5) |
| 130 | |
| 131 | def test_me_4(self): |
| 132 | time.sleep(0.5) |
| 133 | |
| 134 | # Load tests from SampleTestCase defined above |
| 135 | suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase) |
| 136 | runner = unittest.TextTestRunner() |
| 137 | |
| 138 | # Run tests sequentially |
| 139 | runner.run(suite) |
| 140 | |
| 141 | # Run same tests across 4 processes |
| 142 | suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase) |
| 143 | concurrent_suite = ConcurrentTestSuite(suite, fork_for_tests(4)) |
| 144 | runner.run(concurrent_suite) |