bitbake: cooker: Use a queue to feed parsing jobs

Curerntly, recipes to parse are split into equal groups and passed to
each parse thread at the start of parsing. We can replace this with
a queue and collect a new job as each parsing process becomes idle
to better spread load in the case of slow parsing jobs.

Some of the data we need has to be passed in at fork time since it
can't be pickled, so the job to parse is only referenced as an index
in that list.

This should better spread load for slow to parse recipes such as those
with many class extensions.

(Bitbake rev: 1bcc12929de4cea9f85ad6283174cf5a08f09cbb)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Richard Purdie 2025-07-02 19:01:11 +01:00
parent 33c7234431
commit 16bd788ea0

View File

@ -1998,8 +1998,9 @@ class ParsingFailure(Exception):
Exception.__init__(self, realexception, recipe) Exception.__init__(self, realexception, recipe)
class Parser(multiprocessing.Process): class Parser(multiprocessing.Process):
def __init__(self, jobs, results, quit, profile): def __init__(self, jobs, jobid_queue, results, quit, profile):
self.jobs = jobs self.jobs = jobs
self.jobid_queue = jobid_queue
self.results = results self.results = results
self.quit = quit self.quit = quit
multiprocessing.Process.__init__(self) multiprocessing.Process.__init__(self)
@ -2064,12 +2065,14 @@ class Parser(multiprocessing.Process):
if self.quit.is_set(): if self.quit.is_set():
break break
job = None jobid = None
try: try:
job = self.jobs.pop() jobid = self.jobid_queue.get(True, 0.5)
except IndexError: except (ValueError, OSError):
havejobs = False havejobs = False
if job:
if jobid is not None:
job = self.jobs[jobid]
result = self.parse(*job) result = self.parse(*job)
# Clear the siggen cache after parsing to control memory usage, its huge # Clear the siggen cache after parsing to control memory usage, its huge
bb.parse.siggen.postparsing_clean_cache() bb.parse.siggen.postparsing_clean_cache()
@ -2082,6 +2085,7 @@ class Parser(multiprocessing.Process):
except queue.Full: except queue.Full:
pending.append(result) pending.append(result)
finally: finally:
self.jobs.close()
self.results.close() self.results.close()
self.results.join_thread() self.results.join_thread()
@ -2134,13 +2138,13 @@ class CookerParser(object):
self.bb_caches = bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, cooker.caches_array) self.bb_caches = bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, cooker.caches_array)
self.fromcache = set() self.fromcache = set()
self.willparse = set() self.willparse = []
for mc in self.cooker.multiconfigs: for mc in self.cooker.multiconfigs:
for filename in self.mcfilelist[mc]: for filename in self.mcfilelist[mc]:
appends = self.cooker.collections[mc].get_file_appends(filename) appends = self.cooker.collections[mc].get_file_appends(filename)
layername = self.cooker.collections[mc].calc_bbfile_priority(filename)[2] layername = self.cooker.collections[mc].calc_bbfile_priority(filename)[2]
if not self.bb_caches[mc].cacheValid(filename, appends): if not self.bb_caches[mc].cacheValid(filename, appends):
self.willparse.add((mc, self.bb_caches[mc], filename, appends, layername)) self.willparse.append((mc, self.bb_caches[mc], filename, appends, layername))
else: else:
self.fromcache.add((mc, self.bb_caches[mc], filename, appends, layername)) self.fromcache.add((mc, self.bb_caches[mc], filename, appends, layername))
@ -2159,22 +2163,27 @@ class CookerParser(object):
def start(self): def start(self):
self.results = self.load_cached() self.results = self.load_cached()
self.processes = [] self.processes = []
if self.toparse: if self.toparse:
bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
self.toparse_queue = multiprocessing.Queue(len(self.willparse))
self.parser_quit = multiprocessing.Event() self.parser_quit = multiprocessing.Event()
self.result_queue = multiprocessing.Queue() self.result_queue = multiprocessing.Queue()
def chunkify(lst,n): for jobid in range(len(self.willparse)):
return [lst[i::n] for i in range(n)] self.toparse_queue.put(jobid)
self.jobs = chunkify(list(self.willparse), self.num_processes)
# Have to pass in willparse at fork time so all parsing processes have the unpickleable data
# then access it by index from the parse queue.
for i in range(0, self.num_processes): for i in range(0, self.num_processes):
parser = Parser(self.jobs[i], self.result_queue, self.parser_quit, self.cooker.configuration.profile) parser = Parser(self.willparse, self.toparse_queue, self.result_queue, self.parser_quit, self.cooker.configuration.profile)
parser.start() parser.start()
self.process_names.append(parser.name) self.process_names.append(parser.name)
self.processes.append(parser) self.processes.append(parser)
self.toparse_queue.close()
self.results = itertools.chain(self.results, self.parse_generator()) self.results = itertools.chain(self.results, self.parse_generator())
def shutdown(self, clean=True, eventmsg="Parsing halted due to errors"): def shutdown(self, clean=True, eventmsg="Parsing halted due to errors"):