poky/meta/lib/oeqa/selftest/cases/runcmd.py
Richard Purdie ffae400179 meta/lib+scripts: Convert to SPDX license headers
This adds SPDX license headers in place of the wide assortment of things
currently in our script headers. We default to GPL-2.0-only except for the
oeqa code where it was clearly submitted and marked as MIT on the most part
or some scripts which had the "or later" GPL versioning.

The patch also drops other obsolete bits of file headers where they were
encoountered such as editor modelines, obsolete maintainer information or
the phrase "All rights reserved" which is now obsolete and not required in
copyright headers (in this case its actually confusing for licensing as all
rights were not reserved).

More work is needed for OE-Core but this takes care of the bulk of the scripts
and meta/lib directories.

The top level LICENSE files are tweaked to match the new structure and the
SPDX naming.

(From OE-Core rev: f8c9c511b5f1b7dbd45b77f345cb6c048ae6763e)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
2019-05-09 16:31:55 +01:00

122 lines
4.6 KiB
Python

#
# SPDX-License-Identifier: MIT
#
from oeqa.selftest.case import OESelftestTestCase
from oeqa.utils.commands import runCmd
from oeqa.utils import CommandError
import subprocess
import threading
import time
import signal
class MemLogger(object):
def __init__(self):
self.info_msgs = []
self.error_msgs = []
def info(self, msg):
self.info_msgs.append(msg)
def error(self, msg):
self.error_msgs.append(msg)
class RunCmdTests(OESelftestTestCase):
""" Basic tests for runCmd() utility function """
# The delta is intentionally smaller than the timeout, to detect cases where
# we incorrectly apply the timeout more than once.
TIMEOUT = 5
DELTA = 3
def test_result_okay(self):
result = runCmd("true")
self.assertEqual(result.status, 0)
def test_result_false(self):
result = runCmd("false", ignore_status=True)
self.assertEqual(result.status, 1)
def test_shell(self):
# A shell is used for all string commands.
result = runCmd("false; true", ignore_status=True)
self.assertEqual(result.status, 0)
def test_no_shell(self):
self.assertRaises(FileNotFoundError,
runCmd, "false; true", shell=False)
def test_list_not_found(self):
self.assertRaises(FileNotFoundError,
runCmd, ["false; true"])
def test_list_okay(self):
result = runCmd(["true"])
self.assertEqual(result.status, 0)
def test_result_assertion(self):
self.assertRaisesRegexp(AssertionError, "Command 'echo .* false' returned non-zero exit status 1:\nfoobar",
runCmd, "echo foobar >&2; false", shell=True)
def test_result_exception(self):
self.assertRaisesRegexp(CommandError, "Command 'echo .* false' returned non-zero exit status 1 with output: foobar",
runCmd, "echo foobar >&2; false", shell=True, assert_error=False)
def test_output(self):
result = runCmd("echo stdout; echo stderr >&2", shell=True)
self.assertEqual("stdout\nstderr", result.output)
self.assertEqual("", result.error)
def test_output_split(self):
result = runCmd("echo stdout; echo stderr >&2", shell=True, stderr=subprocess.PIPE)
self.assertEqual("stdout", result.output)
self.assertEqual("stderr", result.error)
def test_timeout(self):
numthreads = threading.active_count()
start = time.time()
# Killing a hanging process only works when not using a shell?!
result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True)
self.assertEqual(result.status, -signal.SIGTERM)
end = time.time()
self.assertLess(end - start, self.TIMEOUT + self.DELTA)
self.assertEqual(numthreads, threading.active_count())
def test_timeout_split(self):
numthreads = threading.active_count()
start = time.time()
# Killing a hanging process only works when not using a shell?!
result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, stderr=subprocess.PIPE)
self.assertEqual(result.status, -signal.SIGTERM)
end = time.time()
self.assertLess(end - start, self.TIMEOUT + self.DELTA)
self.assertEqual(numthreads, threading.active_count())
def test_stdin(self):
numthreads = threading.active_count()
result = runCmd("cat", data=b"hello world", timeout=self.TIMEOUT)
self.assertEqual("hello world", result.output)
self.assertEqual(numthreads, threading.active_count())
def test_stdin_timeout(self):
numthreads = threading.active_count()
start = time.time()
result = runCmd(['sleep', '60'], data=b"hello world", timeout=self.TIMEOUT, ignore_status=True)
self.assertEqual(result.status, -signal.SIGTERM)
end = time.time()
self.assertLess(end - start, self.TIMEOUT + self.DELTA)
self.assertEqual(numthreads, threading.active_count())
def test_log(self):
log = MemLogger()
result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log)
self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout", "stderr"], log.info_msgs)
self.assertEqual([], log.error_msgs)
def test_log_split(self):
log = MemLogger()
result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, stderr=subprocess.PIPE)
self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout"], log.info_msgs)
self.assertEqual(["stderr"], log.error_msgs)