
We have other places in the code where we need to take filemode/mask information from a bitbake variable and turn it into a real python number. Turn this internal code into public API in bb.utils and add some tests for it. (Bitbake rev: d89e30fb2fb15b09f2cb95c4e5aa9f749ca257ea) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
20 KiB
Executable File
#!/usr/bin/env python3
Copyright BitBake Contributors
SPDX-License-Identifier: GPL-2.0-only
import os import sys import warnings warnings.simplefilter("default") warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*use.of.fork.may.lead.to.deadlocks.in.the.child.") sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) from bb import fetch2 import logging import bb import select import errno import signal import pickle import traceback import queue import shlex import subprocess import fcntl from multiprocessing import Lock from threading import Thread
Remove when we have a minimum of python 3.10
if not hasattr(fcntl, 'F_SETPIPE_SZ'): fcntl.F_SETPIPE_SZ = 1031
bb.utils.check_system_locale()
Users shouldn't be running this code directly
if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") sys.exit(1)
profiling = False if sys.argv[1].startswith("decafbadbad"): profiling = True try: import cProfile as profile except: import profile
Unbuffer stdout to avoid log truncation in the event
of an unorderly exit as well as to provide timely
updates to log files for use with tail
try: if sys.stdout.name == '': fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) fl |= os.O_SYNC fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) except: pass
logger = logging.getLogger("BitBake")
worker_pipe = sys.stdout.fileno() bb.utils.nonblockingfd(worker_pipe)
Try to make the pipe buffers larger as it is much more efficient. If we can't
e.g. out of buffer space (/proc/sys/fs/pipe-user-pages-soft) then just pass over.
try: fcntl.fcntl(worker_pipe, fcntl.F_SETPIPE_SZ, 512 * 1024) except: pass
Need to guard against multiprocessing being used in child processes
and multiple processes trying to write to the parent at the same time
worker_pipe_lock = None
handler = bb.event.LogHandler() logger.addHandler(handler)
if 0: # Code to write out a log file of all events passing through the worker logfilename = "/tmp/workerlogfile" format_str = "%(levelname)s: %(message)s" conlogformat = bb.msg.BBLogFormatter(format_str) consolelog = logging.FileHandler(logfilename) consolelog.setFormatter(conlogformat) logger.addHandler(consolelog)
worker_queue = queue.Queue()
def worker_fire(event, d): data = b"" + pickle.dumps(event) + b"" worker_fire_prepickled(data)
def worker_fire_prepickled(event): global worker_queue
worker_queue.put(event)
We can end up with write contention with the cooker, it can be trying to send commands
and we can be trying to send event data back. Therefore use a separate thread for writing
back data to cooker.
worker_thread_exit = False
def worker_flush(worker_queue): worker_queue_int = bytearray() global worker_pipe, worker_thread_exit
while True:
try:
worker_queue_int.extend(worker_queue.get(True, 1))
except queue.Empty:
pass
while (worker_queue_int or not worker_queue.empty()):
try:
(_, ready, _) = select.select([], [worker_pipe], [], 1)
if not worker_queue.empty():
worker_queue_int.extend(worker_queue.get())
written = os.write(worker_pipe, worker_queue_int)
del worker_queue_int[0:written]
except (IOError, OSError) as e:
if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
raise
if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
return
worker_thread = Thread(target=worker_flush, args=(worker_queue,)) worker_thread.start()
def worker_child_fire(event, d): global worker_pipe global worker_pipe_lock
data = b"<event>" + pickle.dumps(event) + b"</event>"
try:
with bb.utils.lock_timeout(worker_pipe_lock):
while(len(data)):
written = worker_pipe.write(data)
data = data[written:]
except IOError:
sigterm_handler(None, None)
raise
bb.event.worker_fire = worker_fire
lf = None #lf = open("/tmp/workercommandlog", "w+") def workerlog_write(msg): if lf: lf.write(msg) lf.flush()
def sigterm_handler(signum, frame): signal.signal(signal.SIGTERM, signal.SIG_DFL) os.killpg(0, signal.SIGTERM) sys.exit()
def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask):
fn = runtask['fn']
task = runtask['task']
taskname = runtask['taskname']
taskhash = runtask['taskhash']
unihash = runtask['unihash']
appends = runtask['appends']
layername = runtask['layername']
taskdepdata = runtask['taskdepdata']
quieterrors = runtask['quieterrors']
# We need to setup the environment BEFORE the fork, since
# a fork() or exec*() activates PSEUDO...
envbackup = {}
fakeroot = False
fakeenv = {}
umask = None
uid = os.getuid()
gid = os.getgid()
taskdep = runtask['taskdep']
if 'umask' in taskdep and taskname in taskdep['umask']:
umask = taskdep['umask'][taskname]
elif workerdata["umask"]:
umask = workerdata["umask"]
if umask:
# Convert to a python numeric value as it could be a string
umask = bb.utils.to_filemode(umask)
dry_run = cfg.dry_run or runtask['dry_run']
# We can't use the fakeroot environment in a dry run as it possibly hasn't been built
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
fakeroot = True
envvars = (runtask['fakerootenv'] or "").split()
for key, value in (var.split('=',1) for var in envvars):
envbackup[key] = os.environ.get(key)
os.environ[key] = value
fakeenv[key] = value
fakedirs = (runtask['fakerootdirs'] or "").split()
for p in fakedirs:
bb.utils.mkdirhier(p)
logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' %
(fn, taskname, ', '.join(fakedirs)))
else:
envvars = (runtask['fakerootnoenv'] or "").split()
for key, value in (var.split('=',1) for var in envvars):
envbackup[key] = os.environ.get(key)
os.environ[key] = value
fakeenv[key] = value
sys.stdout.flush()
sys.stderr.flush()
try:
pipein, pipeout = os.pipe()
pipein = os.fdopen(pipein, 'rb', 4096)
pipeout = os.fdopen(pipeout, 'wb', 0)
pid = os.fork()
except OSError as e:
logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
sys.exit(1)
if pid == 0:
def child():
global worker_pipe
global worker_pipe_lock
pipein.close()
bb.utils.signal_on_parent_exit("SIGTERM")
# Save out the PID so that the event can include it the
# events
bb.event.worker_pid = os.getpid()
bb.event.worker_fire = worker_child_fire
worker_pipe = pipeout
worker_pipe_lock = Lock()
# Make the child the process group leader and ensure no
# child process will be controlled by the current terminal
# This ensures signals sent to the controlling terminal like Ctrl+C
# don't stop the child processes.
os.setsid()
signal.signal(signal.SIGTERM, sigterm_handler)
# Let SIGHUP exit as SIGTERM
signal.signal(signal.SIGHUP, sigterm_handler)
# No stdin & stdout
# stdout is used as a status report channel and must not be used by child processes.
dumbio = os.open(os.devnull, os.O_RDWR)
os.dup2(dumbio, sys.stdin.fileno())
os.dup2(dumbio, sys.stdout.fileno())
if umask is not None:
os.umask(umask)
try:
(realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
the_data = databuilder.mcdata[mc]
the_data.setVar("BB_WORKERCONTEXT", "1")
the_data.setVar("BB_TASKDEPDATA", taskdepdata)
the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", ""))
if cfg.limited_deps:
the_data.setVar("BB_LIMITEDDEPS", "1")
the_data.setVar("BUILDNAME", workerdata["buildname"])
the_data.setVar("DATE", workerdata["date"])
the_data.setVar("TIME", workerdata["time"])
for varname, value in extraconfigdata.items():
the_data.setVar(varname, value)
bb.parse.siggen.set_taskdata(workerdata["sigdata"])
if "newhashes" in workerdata:
bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
ret = 0
the_data = databuilder.parseRecipe(fn, appends, layername)
the_data.setVar('BB_TASKHASH', taskhash)
the_data.setVar('BB_UNIHASH', unihash)
bb.parse.siggen.setup_datacache_from_datastore(fn, the_data)
bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')):
if bb.utils.is_local_uid(uid):
logger.debug("Attempting to disable network for %s" % taskname)
bb.utils.disable_network(uid, gid)
else:
logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid))
# exported_vars() returns a generator which *cannot* be passed to os.environ.update()
# successfully. We also need to unset anything from the environment which shouldn't be there
exports = bb.data.exported_vars(the_data)
bb.utils.empty_environment()
for e, v in exports:
os.environ[e] = v
for e in fakeenv:
os.environ[e] = fakeenv[e]
the_data.setVar(e, fakeenv[e])
the_data.setVarFlag(e, 'export', "1")
task_exports = the_data.getVarFlag(taskname, 'exports')
if task_exports:
for e in task_exports.split():
the_data.setVarFlag(e, 'export', '1')
v = the_data.getVar(e)
if v is not None:
os.environ[e] = v
if quieterrors:
the_data.setVarFlag(taskname, "quieterrors", "1")
except Exception:
if not quieterrors:
logger.critical(traceback.format_exc())
os._exit(1)
sys.stdout.flush()
sys.stderr.flush()
try:
if dry_run:
return 0
try:
ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
finally:
if fakeroot:
fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD"))
subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE)
return ret
except:
os._exit(1)
if not profiling:
os._exit(child())
else:
profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
prof = profile.Profile()
try:
ret = profile.Profile.runcall(prof, child)
finally:
prof.dump_stats(profname)
bb.utils.process_profilelog(profname)
os._exit(ret)
else:
for key, value in iter(envbackup.items()):
if value is None:
del os.environ[key]
else:
os.environ[key] = value
return pid, pipein, pipeout
class runQueueWorkerPipe(): """ Abstraction for a pipe between a worker thread and the worker server """ def init(self, pipein, pipeout): self.input = pipein if pipeout: pipeout.close() bb.utils.nonblockingfd(self.input) self.queue = bytearray()
def read(self):
start = len(self.queue)
try:
self.queue.extend(self.input.read(512*1024) or b"")
except (OSError, IOError) as e:
if e.errno != errno.EAGAIN:
raise
end = len(self.queue)
index = self.queue.find(b"</event>")
while index != -1:
msg = self.queue[:index+8]
assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1
worker_fire_prepickled(msg)
self.queue = self.queue[index+8:]
index = self.queue.find(b"</event>")
return (end > start)
def close(self):
while self.read():
continue
if len(self.queue) > 0:
print("Warning, worker child left partial message: %s" % self.queue)
self.input.close()
normalexit = False
class BitbakeWorker(object): def init(self, din): self.input = din bb.utils.nonblockingfd(self.input) self.queue = bytearray() self.cookercfg = None self.databuilder = None self.data = None self.extraconfigdata = None self.build_pids = {} self.build_pipes = {}
signal.signal(signal.SIGTERM, self.sigterm_exception)
# Let SIGHUP exit as SIGTERM
signal.signal(signal.SIGHUP, self.sigterm_exception)
if "beef" in sys.argv[1]:
bb.utils.set_process_name("Worker (Fakeroot)")
else:
bb.utils.set_process_name("Worker")
def sigterm_exception(self, signum, stackframe):
if signum == signal.SIGTERM:
bb.warn("Worker received SIGTERM, shutting down...")
elif signum == signal.SIGHUP:
bb.warn("Worker received SIGHUP, shutting down...")
self.handle_finishnow(None)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
os.kill(os.getpid(), signal.SIGTERM)
def serve(self):
while True:
(ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
if self.input in ready:
try:
r = self.input.read()
if len(r) == 0:
# EOF on pipe, server must have terminated
self.sigterm_exception(signal.SIGTERM, None)
self.queue.extend(r)
except (OSError, IOError):
pass
if len(self.queue):
self.handle_item(b"cookerconfig", self.handle_cookercfg)
self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
self.handle_item(b"workerdata", self.handle_workerdata)
self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
self.handle_item(b"runtask", self.handle_runtask)
self.handle_item(b"finishnow", self.handle_finishnow)
self.handle_item(b"ping", self.handle_ping)
self.handle_item(b"quit", self.handle_quit)
for pipe in self.build_pipes:
if self.build_pipes[pipe].input in ready:
self.build_pipes[pipe].read()
if len(self.build_pids):
while self.process_waitpid():
continue
def handle_item(self, item, func):
opening_tag = b"<" + item + b">"
if not self.queue.startswith(opening_tag):
return
tag_len = len(opening_tag)
if len(self.queue) < tag_len + 4:
# we need to receive more data
return
header = self.queue[tag_len:tag_len + 4]
payload_len = int.from_bytes(header, 'big')
# closing tag has length (tag_len + 1)
if len(self.queue) < tag_len * 2 + 1 + payload_len:
# we need to receive more data
return
index = self.queue.find(b"</" + item + b">")
if index != -1:
try:
func(self.queue[(tag_len + 4):index])
except pickle.UnpicklingError:
workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
raise
self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):]
def handle_cookercfg(self, data):
self.cookercfg = pickle.loads(data)
self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
self.databuilder.parseBaseConfiguration(worker=True)
self.data = self.databuilder.data
def handle_extraconfigdata(self, data):
self.extraconfigdata = pickle.loads(data)
def handle_workerdata(self, data):
self.workerdata = pickle.loads(data)
bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"]
bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"]
bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"]
bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
for mc in self.databuilder.mcdata:
self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"])
self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe")
def handle_newtaskhashes(self, data):
self.workerdata["newhashes"] = pickle.loads(data)
def handle_ping(self, _):
workerlog_write("Handling ping\n")
logger.warning("Pong from bitbake-worker!")
def handle_quit(self, data):
workerlog_write("Handling quit\n")
global normalexit
normalexit = True
sys.exit(0)
def handle_runtask(self, data):
runtask = pickle.loads(data)
fn = runtask['fn']
task = runtask['task']
taskname = runtask['taskname']
workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask)
self.build_pids[pid] = task
self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
def process_waitpid(self):
"""
Return none is there are no processes awaiting result collection, otherwise
collect the process exit codes and close the information pipe.
"""
try:
pid, status = os.waitpid(-1, os.WNOHANG)
if pid == 0 or os.WIFSTOPPED(status):
return False
except OSError:
return False
workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
if os.WIFEXITED(status):
status = os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
# Per shell conventions for $?, when a process exits due to
# a signal, we return an exit code of 128 + SIGNUM
status = 128 + os.WTERMSIG(status)
task = self.build_pids[pid]
del self.build_pids[pid]
self.build_pipes[pid].close()
del self.build_pipes[pid]
worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
return True
def handle_finishnow(self, _):
if self.build_pids:
logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
for k, v in iter(self.build_pids.items()):
try:
os.kill(-k, signal.SIGTERM)
os.waitpid(-1, 0)
except:
pass
for pipe in self.build_pipes:
self.build_pipes[pipe].read()
try: worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) if not profiling: worker.serve() else: profname = "profile-worker.log" prof = profile.Profile() try: profile.Profile.runcall(prof, worker.serve) finally: prof.dump_stats(profname) bb.utils.process_profilelog(profname) except BaseException as e: if not normalexit: import traceback sys.stderr.write(traceback.format_exc()) sys.stderr.write(str(e)) finally: worker_thread_exit = True worker_thread.join()
workerlog_write("exiting") if not normalexit: sys.exit(1) sys.exit(0)