utils: Add multiprocess_launch API and testcase

The current methods of spawning processes for parallel execution have
issues around collection of results or exceptions.

Take the code from package_ipk/deb, make it generic, add a results
collection mechanism, fix the exception handling and for it into a
standard library function.

Also add a test case which tests both the success and failure modes
of operation to stop this functionality regressiing again.

In particular, compared to multiprocess_exec, this fork off the parent
approach means we can pass in the datastore and functions work in the
same scope as the parent. This removes some of the complexities
found trying to scale multiprocess_exec to wider use.

(From OE-Core rev: 88f0c214e593a45566df5131bda4c946f5ccc8c2)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Richard Purdie 2018-07-19 20:31:35 +00:00
parent 4c67ffef2e
commit 6d66b57409
2 changed files with 115 additions and 1 deletions

View File

@ -1,4 +1,6 @@
import subprocess
import multiprocessing
import traceback
def read_file(filename):
try:
@ -280,6 +282,74 @@ def multiprocess_exec(commands, function):
return results
# For each item in items, call the function 'target' with item as the first
# argument, extraargs as the other arguments and handle any exceptions in the
# parent thread
def multiprocess_launch(target, items, d, extraargs=None):
class ProcessLaunch(multiprocessing.Process):
def __init__(self, *args, **kwargs):
multiprocessing.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = multiprocessing.Pipe()
self._exception = None
self._result = None
def run(self):
try:
ret = self._target(*self._args, **self._kwargs)
self._cconn.send((None, ret))
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
def update(self):
if self._pconn.poll():
(e, tb) = self._pconn.recv()
if e is not None:
self._exception = (e, tb)
else:
self._result = tb
@property
def exception(self):
self.update()
return self._exception
@property
def result(self):
self.update()
return self._result
max_process = int(d.getVar("BB_NUMBER_THREADS") or os.cpu_count() or 1)
launched = []
errors = []
results = []
items = list(items)
while (items and not errors) or launched:
if not errors and items and len(launched) < max_process:
args = (items.pop(),)
if extraargs is not None:
args = args + extraargs
p = ProcessLaunch(target=target, args=args)
p.start()
launched.append(p)
for q in launched:
# The finished processes are joined when calling is_alive()
if not q.is_alive():
if q.exception:
errors.append(q.exception)
if q.result:
results.append(q.result)
launched.remove(q)
# Paranoia doesn't hurt
for p in launched:
p.join()
if errors:
for (e, tb) in errors:
bb.error(str(tb))
bb.fatal("Fatal errors occurred in subprocesses, tracebacks printed above")
return results
def squashspaces(string):
import re
return re.sub("\s+", " ", string).strip()

View File

@ -1,5 +1,8 @@
import sys
from unittest.case import TestCase
from oe.utils import packages_filter_out_system, trim_version
from contextlib import contextmanager
from io import StringIO
from oe.utils import packages_filter_out_system, trim_version, multiprocess_launch
class TestPackagesFilterOutSystem(TestCase):
def test_filter(self):
@ -49,3 +52,44 @@ class TestTrimVersion(TestCase):
self.assertEqual(trim_version("1.2.3", 2), "1.2")
self.assertEqual(trim_version("1.2.3", 3), "1.2.3")
self.assertEqual(trim_version("1.2.3", 4), "1.2.3")
class TestMultiprocessLaunch(TestCase):
def test_multiprocesslaunch(self):
import bb
def testfunction(item, d):
if item == "2" or item == "1":
raise KeyError("Invalid number %s" % item)
return "Found %s" % item
def dummyerror(msg):
print("ERROR: %s" % msg)
@contextmanager
def captured_output():
new_out, new_err = StringIO(), StringIO()
old_out, old_err = sys.stdout, sys.stderr
try:
sys.stdout, sys.stderr = new_out, new_err
yield sys.stdout, sys.stderr
finally:
sys.stdout, sys.stderr = old_out, old_err
d = bb.data_smart.DataSmart()
bb.error = dummyerror
# Assert the function returns the right results
result = multiprocess_launch(testfunction, ["3", "4", "5", "6"], d, extraargs=(d,))
self.assertIn("Found 3", result)
self.assertIn("Found 4", result)
self.assertIn("Found 5", result)
self.assertIn("Found 6", result)
self.assertEqual(len(result), 4)
# Assert the function prints exceptions
with captured_output() as (out, err):
self.assertRaises(bb.BBHandledException, multiprocess_launch, testfunction, ["1", "2", "3", "4", "5", "6"], d, extraargs=(d,))
self.assertIn("KeyError: 'Invalid number 1'", out.getvalue())
self.assertIn("KeyError: 'Invalid number 2'", out.getvalue())