poky/scripts/oe-selftest
Humberto Ibarra b28e6d6039 oe-selftest: check for coverage version before starting tests
python coverage versions lower than 4.x have problems with some distros. Adding the 4.x version as requirement to continue with coverage tracking.

[YOCTO #10207]

(From OE-Core rev: a378b817504986173c4b0984a28aead247589b3f)

Signed-off-by: Humberto Ibarra <humberto.ibarra.lopez@intel.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
2016-09-15 12:15:07 +01:00

28 KiB
Executable File

#!/usr/bin/env python3

Copyright (c) 2013 Intel Corporation

This program is free software; you can redistribute it and/or modify

it under the terms of the GNU General Public License version 2 as

published by the Free Software Foundation.

This program is distributed in the hope that it will be useful,

but WITHOUT ANY WARRANTY; without even the implied warranty of

MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

GNU General Public License for more details.

You should have received a copy of the GNU General Public License along

with this program; if not, write to the Free Software Foundation, Inc.,

51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

DESCRIPTION

This script runs tests defined in meta/lib/oeqa/selftest/

It's purpose is to automate the testing of different bitbake tools.

To use it you just need to source your build environment setup script and

add the meta-selftest layer to your BBLAYERS.

Call the script as: "oe-selftest -a" to run all the tests in meta/lib/oeqa/selftest/

Call the script as: "oe-selftest -r .." to run just a single test

E.g: "oe-selftest -r bblayers.BitbakeLayers" will run just the BitbakeLayers class from meta/lib/oeqa/selftest/bblayers.py

import os import sys import unittest import logging import argparse import subprocess import time as t import re import fnmatch import collections import imp

sys.path.insert(0, os.path.dirname(os.path.realpath(file)) + '/lib') import scriptpath scriptpath.add_bitbake_lib_path() scriptpath.add_oe_lib_path() import argparse_oe

import oeqa.selftest import oeqa.utils.ftools as ftools from oeqa.utils.commands import runCmd, get_bb_var, get_test_layer from oeqa.selftest.base import oeSelfTest, get_available_machines

try: import xmlrunner from xmlrunner.result import _XMLTestResult as TestResult from xmlrunner import XMLTestRunner as _TestRunner except ImportError: # use the base runner instead from unittest import TextTestResult as TestResult from unittest import TextTestRunner as _TestRunner

log_prefix = "oe-selftest-" + t.strftime("%Y%m%d-%H%M%S")

def logger_create(): log_file = log_prefix + ".log" if os.path.exists("oe-selftest.log"): os.remove("oe-selftest.log") os.symlink(log_file, "oe-selftest.log")

log = logging.getLogger("selftest")
log.setLevel(logging.DEBUG)

fh = logging.FileHandler(filename=log_file, mode='w')
fh.setLevel(logging.DEBUG)

ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)

log.addHandler(fh)
log.addHandler(ch)

return log

log = logger_create()

def get_args_parser(): description = "Script that runs unit tests agains bitbake and other Yocto related tools. The goal is to validate tools functionality and metadata integrity. Refer to https://wiki.yoctoproject.org/wiki/Oe-selftest for more information." parser = argparse_oe.ArgumentParser(description=description) group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-r', '--run-tests', required=False, action='store', nargs='', dest="run_tests", default=None, help='Select what tests to run (modules, classes or test methods). Format should be: ..<test_method>') group.add_argument('-a', '--run-all-tests', required=False, action="store_true", dest="run_all_tests", default=False, help='Run all (unhidden) tests') group.add_argument('-m', '--list-modules', required=False, action="store_true", dest="list_modules", default=False, help='List all available test modules.') group.add_argument('--list-classes', required=False, action="store_true", dest="list_allclasses", default=False, help='List all available test classes.') parser.add_argument('--coverage', action="store_true", help="Run code coverage when testing") parser.add_argument('--coverage-source', dest="coverage_source", nargs="+", help="Specifiy the directories to take coverage from") parser.add_argument('--coverage-include', dest="coverage_include", nargs="+", help="Specify extra patterns to include into the coverage measurement") parser.add_argument('--coverage-omit', dest="coverage_omit", nargs="+", help="Specify with extra patterns to exclude from the coverage measurement") group.add_argument('--run-tests-by', required=False, dest='run_tests_by', default=False, nargs='', help='run-tests-by <name|class|module|id|tag> <list of tests|classes|modules|ids|tags>') group.add_argument('--list-tests-by', required=False, dest='list_tests_by', default=False, nargs='*', help='list-tests-by <name|class|module|id|tag> <list of tests|classes|modules|ids|tags>') group.add_argument('-l', '--list-tests', required=False, action="store_true", dest="list_tests", default=False, help='List all available tests.') group.add_argument('--list-tags', required=False, dest='list_tags', default=False, action="store_true", help='List all tags that have been set to test cases.') parser.add_argument('--machine', required=False, dest='machine', choices=['random', 'all'], default=None, help='Run tests on different machines (random/all).') return parser

def preflight_check():

log.info("Checking that everything is in order before running the tests")

if not os.environ.get("BUILDDIR"):
    log.error("BUILDDIR isn't set. Did you forget to source your build environment setup script?")
    return False

builddir = os.environ.get("BUILDDIR")
if os.getcwd() != builddir:
    log.info("Changing cwd to %s" % builddir)
    os.chdir(builddir)

if not "meta-selftest" in get_bb_var("BBLAYERS"):
    log.error("You don't seem to have the meta-selftest layer in BBLAYERS")
    return False

log.info("Running bitbake -p")
runCmd("bitbake -p")

return True

def add_include(): builddir = os.environ.get("BUILDDIR") if "#include added by oe-selftest.py"
not in ftools.read_file(os.path.join(builddir, "conf/local.conf")): log.info("Adding: "include selftest.inc" in local.conf") ftools.append_file(os.path.join(builddir, "conf/local.conf"),
"\n#include added by oe-selftest.py\ninclude machine.inc\ninclude selftest.inc")

if "#include added by oe-selftest.py" \
    not in ftools.read_file(os.path.join(builddir, "conf/bblayers.conf")):
        log.info("Adding: \"include bblayers.inc\" in bblayers.conf")
        ftools.append_file(os.path.join(builddir, "conf/bblayers.conf"), \
                "\n#include added by oe-selftest.py\ninclude bblayers.inc")

def remove_include(): builddir = os.environ.get("BUILDDIR") if builddir is None: return if "#include added by oe-selftest.py"
in ftools.read_file(os.path.join(builddir, "conf/local.conf")): log.info("Removing the include from local.conf") ftools.remove_from_file(os.path.join(builddir, "conf/local.conf"),
"\n#include added by oe-selftest.py\ninclude machine.inc\ninclude selftest.inc")

if "#include added by oe-selftest.py" \
    in ftools.read_file(os.path.join(builddir, "conf/bblayers.conf")):
        log.info("Removing the include from bblayers.conf")
        ftools.remove_from_file(os.path.join(builddir, "conf/bblayers.conf"), \
                "\n#include added by oe-selftest.py\ninclude bblayers.inc")

def remove_inc_files(): try: os.remove(os.path.join(os.environ.get("BUILDDIR"), "conf/selftest.inc")) for root, _, files in os.walk(get_test_layer()): for f in files: if f == 'test_recipe.inc': os.remove(os.path.join(root, f)) except (AttributeError, OSError,) as e: # AttributeError may happen if BUILDDIR is not set pass

for incl_file in ['conf/bblayers.inc', 'conf/machine.inc']:
    try:
        os.remove(os.path.join(os.environ.get("BUILDDIR"), incl_file))
    except:
        pass

def get_tests_modules(include_hidden=False): modules_list = list() for modules_path in oeqa.selftest.path: for (p, d, f) in os.walk(modules_path): files = sorted([f for f in os.listdir(p) if f.endswith('.py') and not (f.startswith('_') and not include_hidden) and not f.startswith('__') and f != 'base.py']) for f in files: submodules = p.split("selftest")[-1] module = "" if submodules: module = 'oeqa.selftest' + submodules.replace("/",".") + "." + f.split('.py')[0] else: module = 'oeqa.selftest.' + f.split('.py')[0] if module not in modules_list: modules_list.append(module) return modules_list

def get_tests(exclusive_modules=[], include_hidden=False): test_modules = list() for x in exclusive_modules: test_modules.append('oeqa.selftest.' + x) if not test_modules: inc_hidden = include_hidden test_modules = get_tests_modules(inc_hidden)

return test_modules

class Tc: def init(self, tcname, tcclass, tcmodule, tcid=None, tctag=None): self.tcname = tcname self.tcclass = tcclass self.tcmodule = tcmodule self.tcid = tcid # A test case can have multiple tags (as tuples) otherwise str will suffice self.tctag = tctag self.fullpath = '.'.join(['oeqa', 'selftest', tcmodule, tcclass, tcname])

def get_tests_from_module(tmod): tlist = [] prefix = 'oeqa.selftest.'

try:
    import importlib
    modlib = importlib.import_module(tmod)
    for mod in list(vars(modlib).values()):
        if isinstance(mod, type(oeSelfTest)) and issubclass(mod, oeSelfTest) and mod is not oeSelfTest:
            for test in dir(mod):
                if test.startswith('test_') and hasattr(vars(mod)[test], '__call__'):
                    # Get test case id and feature tag
                    # NOTE: if testcase decorator or feature tag not set will throw error
                    try:
                        tid = vars(mod)[test].test_case
                    except:
                        print('DEBUG: tc id missing for ' + str(test))
                        tid = None
                    try:
                        ttag = vars(mod)[test].tag__feature
                    except:
                        # print('DEBUG: feature tag missing for ' + str(test))
                        ttag = None

                    # NOTE: for some reason lstrip() doesn't work for mod.__module__
                    tlist.append(Tc(test, mod.__name__, mod.__module__.replace(prefix, ''), tid, ttag))
except:
    pass

return tlist

def get_all_tests(): # Get all the test modules (except the hidden ones) testlist = [] tests_modules = get_tests_modules() # Get all the tests from modules for tmod in sorted(tests_modules): testlist += get_tests_from_module(tmod) return testlist

def get_testsuite_by(criteria, keyword): # Get a testsuite based on 'keyword' # criteria: name, class, module, id, tag # keyword: a list of tests, classes, modules, ids, tags

ts = []
all_tests = get_all_tests()

def get_matches(values):
    # Get an item and return the ones that match with keyword(s)
    # values: the list of items (names, modules, classes...)
    result = []
    remaining = values[:]
    for key in keyword:
        found = False
        if key in remaining:
            # Regular matching of exact item
            result.append(key)
            remaining.remove(key)
            found = True
        else:
            # Wildcard matching
            pattern = re.compile(fnmatch.translate(r"%s" % key))
            added = [x for x in remaining if pattern.match(x)]
            if added:
                result.extend(added)
                remaining = [x for x in remaining if x not in added]
                found = True
        if not found:
            log.error("Failed to find test: %s" % key)

    return result

if criteria == 'name':
    names = get_matches([ tc.tcname for tc in all_tests ])
    ts = [ tc for tc in all_tests if tc.tcname in names ]

elif criteria == 'class':
    classes = get_matches([ tc.tcclass for tc in all_tests ])
    ts = [ tc for tc in all_tests if tc.tcclass in classes ]

elif criteria == 'module':
    modules = get_matches([ tc.tcmodule for tc in all_tests ])
    ts = [ tc for tc in all_tests if tc.tcmodule in modules ]

elif criteria == 'id':
    ids = get_matches([ str(tc.tcid) for tc in all_tests ])
    ts = [ tc for tc in all_tests if str(tc.tcid) in ids ]

elif criteria == 'tag':
    values = set()
    for tc in all_tests:
        # tc can have multiple tags (as tuple) otherwise str will suffice
        if isinstance(tc.tctag, tuple):
            values |= { str(tag) for tag in tc.tctag }
        else:
            values.add(str(tc.tctag))

    tags = get_matches(list(values))

    for tc in all_tests:
        for tag in tags:
            if isinstance(tc.tctag, tuple) and tag in tc.tctag:
                ts.append(tc)
            elif tag == tc.tctag:
                ts.append(tc)

    # Remove duplicates from the list
    ts = list(set(ts))

return ts

def list_testsuite_by(criteria, keyword): # Get a testsuite based on 'keyword' # criteria: name, class, module, id, tag # keyword: a list of tests, classes, modules, ids, tags

ts = sorted([ (tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule) for tc in get_testsuite_by(criteria, keyword) ])

print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % ('id', 'tag', 'name', 'class', 'module'))
print('_' * 150)
for t in ts:
    if isinstance(t[1], (tuple, list)):
        print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % (t[0], ', '.join(t[1]), t[2], t[3], t[4]))
    else:
        print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % t)
print('_' * 150)
print('Filtering by:\t %s' % criteria)
print('Looking for:\t %s' % ', '.join(str(x) for x in keyword))
print('Total found:\t %s' % len(ts))

def list_tests(): # List all available oe-selftest tests

ts = get_all_tests()

print('%-4s\t%-10s\t%-50s' % ('id', 'tag', 'test'))
print('_' * 80)
for t in ts:
    if isinstance(t.tctag, (tuple, list)):
        print('%-4s\t%-10s\t%-50s' % (t.tcid, ', '.join(t.tctag), '.'.join([t.tcmodule, t.tcclass, t.tcname])))
    else:
        print('%-4s\t%-10s\t%-50s' % (t.tcid, t.tctag, '.'.join([t.tcmodule, t.tcclass, t.tcname])))
print('_' * 80)
print('Total found:\t %s' % len(ts))

def list_tags(): # Get all tags set to test cases # This is useful when setting tags to test cases # The list of tags should be kept as minimal as possible tags = set() all_tests = get_all_tests()

for tc in all_tests:
    if isinstance(tc.tctag, (tuple, list)):
        tags.update(set(tc.tctag))
    else:
        tags.add(tc.tctag)

print('Tags:\t%s' % ', '.join(str(x) for x in tags))

def coverage_setup(coverage_source, coverage_include, coverage_omit): """ Set up the coverage measurement for the testcases to be run """ import datetime import subprocess builddir = os.environ.get("BUILDDIR") pokydir = os.path.dirname(os.path.dirname(os.path.realpath(file))) curcommit= subprocess.check_output(["git", "--git-dir", os.path.join(pokydir, ".git"), "rev-parse", "HEAD"]).decode('utf-8') coveragerc = "%s/.coveragerc" % builddir data_file = "%s/.coverage." % builddir data_file += datetime.datetime.now().strftime('%Y%m%dT%H%M%S') if os.path.isfile(data_file): os.remove(data_file) with open(coveragerc, 'w') as cps: cps.write("# Generated with command '%s'\n" % " ".join(sys.argv)) cps.write("# HEAD commit %s\n" % curcommit.strip()) cps.write("[run]\n") cps.write("data_file = %s\n" % data_file) cps.write("branch = True\n") # Measure just BBLAYERS, scripts and bitbake folders cps.write("source = \n") if coverage_source: for directory in coverage_source: if not os.path.isdir(directory): log.warn("Directory %s is not valid.", directory) cps.write(" %s\n" % directory) else: for layer in get_bb_var('BBLAYERS').split(): cps.write(" %s\n" % layer) cps.write(" %s\n" % os.path.dirname(os.path.realpath(file))) cps.write(" %s\n" % os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(file))),'bitbake'))

    if coverage_include:
        cps.write("include = \n")
        for pattern in coverage_include:
            cps.write("    %s\n" % pattern)
    if coverage_omit:
        cps.write("omit = \n")
        for pattern in coverage_omit:
            cps.write("    %s\n" % pattern)

    return coveragerc

def coverage_report(): """ Loads the coverage data gathered and reports it back """ try: # Coverage4 uses coverage.Coverage from coverage import Coverage except: # Coverage under version 4 uses coverage.coverage from coverage import coverage as Coverage

import io as StringIO
from coverage.misc import CoverageException

cov_output = StringIO.StringIO()
# Creating the coverage data with the setting from the configuration file
cov = Coverage(config_file = os.environ.get('COVERAGE_PROCESS_START'))
try:
    # Load data from the data file specified in the configuration
    cov.load()
    # Store report data in a StringIO variable
    cov.report(file = cov_output, show_missing=False)
    log.info("\n%s" % cov_output.getvalue())
except CoverageException as e:
    # Show problems with the reporting. Since Coverage4 not finding  any data to report raises an exception
    log.warn("%s" % str(e))
finally:
    cov_output.close()

def main(): parser = get_args_parser() args = parser.parse_args()

# Add <layer>/lib to sys.path, so layers can add selftests
log.info("Running bitbake -e to get BBPATH")
bbpath = get_bb_var('BBPATH').split(':')
layer_libdirs = [p for p in (os.path.join(l, 'lib') for l in bbpath) if os.path.exists(p)]
sys.path.extend(layer_libdirs)
imp.reload(oeqa.selftest)

if args.run_tests_by and len(args.run_tests_by) >= 2:
    valid_options = ['name', 'class', 'module', 'id', 'tag']
    if args.run_tests_by[0] not in valid_options:
        print('--run-tests-by %s not a valid option. Choose one of <name|class|module|id|tag>.' % args.run_tests_by[0])
        return 1
    else:
        criteria = args.run_tests_by[0]
        keyword = args.run_tests_by[1:]
        ts = sorted([ tc.fullpath for tc in get_testsuite_by(criteria, keyword) ])
    if not ts:
        return 1

if args.list_tests_by and len(args.list_tests_by) >= 2:
    valid_options = ['name', 'class', 'module', 'id', 'tag']
    if args.list_tests_by[0] not in valid_options:
        print('--list-tests-by %s not a valid option. Choose one of <name|class|module|id|tag>.' % args.list_tests_by[0])
        return 1
    else:
        criteria = args.list_tests_by[0]
        keyword = args.list_tests_by[1:]
        list_testsuite_by(criteria, keyword)

if args.list_tests:
    list_tests()

if args.list_tags:
    list_tags()

if args.list_allclasses:
    args.list_modules = True

if args.list_modules:
    log.info('Listing all available test modules:')
    testslist = get_tests(include_hidden=True)
    for test in testslist:
        module = test.split('oeqa.selftest.')[-1]
        info = ''
        if module.startswith('_'):
            info = ' (hidden)'
        print(module + info)
        if args.list_allclasses:
            try:
                import importlib
                modlib = importlib.import_module(test)
                for v in vars(modlib):
                    t = vars(modlib)[v]
                    if isinstance(t, type(oeSelfTest)) and issubclass(t, oeSelfTest) and t!=oeSelfTest:
                        print(" --", v)
                        for method in dir(t):
                            if method.startswith("test_") and isinstance(vars(t)[method], collections.Callable):
                                print(" --  --", method)

            except (AttributeError, ImportError) as e:
                print(e)
                pass

if args.run_tests or args.run_all_tests or args.run_tests_by:
    if not preflight_check():
        return 1

    if args.run_tests_by:
        testslist = ts
    else:
        testslist = get_tests(exclusive_modules=(args.run_tests or []), include_hidden=False)

    suite = unittest.TestSuite()
    loader = unittest.TestLoader()
    loader.sortTestMethodsUsing = None
    runner = TestRunner(verbosity=2,
            resultclass=buildResultClass(args))
    # we need to do this here, otherwise just loading the tests
    # will take 2 minutes (bitbake -e calls)
    oeSelfTest.testlayer_path = get_test_layer()
    for test in testslist:
        log.info("Loading tests from: %s" % test)
        try:
            suite.addTests(loader.loadTestsFromName(test))
        except AttributeError as e:
            log.error("Failed to import %s" % test)
            log.error(e)
            return 1
    add_include()

    if args.machine:
        # Custom machine sets only weak default values (??=) for MACHINE in machine.inc
        # This let test cases that require a specific MACHINE to be able to override it, using (?= or =)
        log.info('Custom machine mode enabled. MACHINE set to %s' % args.machine)
        if args.machine == 'random':
            os.environ['CUSTOMMACHINE'] = 'random'
            result = runner.run(suite)
        else:  # all
            machines = get_available_machines()
            for m in machines:
                log.info('Run tests with custom MACHINE set to: %s' % m)
                os.environ['CUSTOMMACHINE'] = m
                result = runner.run(suite)
    else:
        result = runner.run(suite)

    log.info("Finished")

    if result.wasSuccessful():
        return 0
    else:
        return 1

def buildResultClass(args): """Build a Result Class to use in the testcase execution""" import site

class StampedResult(TestResult):
    """
    Custom TestResult that prints the time when a test starts.  As oe-selftest
    can take a long time (ie a few hours) to run, timestamps help us understand
    what tests are taking a long time to execute.
    If coverage is required, this class executes the coverage setup and reporting.
    """
    def startTest(self, test):
        import time
        self.stream.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " - ")
        super(StampedResult, self).startTest(test)

    def startTestRun(self):
        """ Setup coverage before running any testcase """

        # variable holding the coverage configuration file allowing subprocess to be measured
        self.coveragepth = None

        # indicates the system if coverage is currently installed
        self.coverage_installed = True

        if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit:
            try:
                # check if user can do coverage
                import coverage
            except:
                log.warn("python coverage is not installed. More info on https://pypi.python.org/pypi/coverage")
                self.coverage_installed = False

            if self.coverage_installed:
                log.info("Coverage is enabled")

                major_version = int(coverage.version.__version__[0])
                if major_version < 4:
                    log.error("python coverage %s installed. Require version 4 or greater." % coverage.version.__version__)
                    self.stop()
                # In case the user has not set the variable COVERAGE_PROCESS_START,
                # create a default one and export it. The COVERAGE_PROCESS_START
                # value indicates where the coverage configuration file resides
                # More info on https://pypi.python.org/pypi/coverage
                if not os.environ.get('COVERAGE_PROCESS_START'):
                    os.environ['COVERAGE_PROCESS_START'] = coverage_setup(args.coverage_source, args.coverage_include, args.coverage_omit)

                # Use default site.USER_SITE and write corresponding config file
                site.ENABLE_USER_SITE = True
                if not os.path.exists(site.USER_SITE):
                    os.makedirs(site.USER_SITE)
                self.coveragepth = os.path.join(site.USER_SITE, "coverage.pth")
                with open(self.coveragepth, 'w') as cps:
                    cps.write('import sys,site; sys.path.extend(site.getsitepackages()); import coverage; coverage.process_startup();')

    def stopTestRun(self):
        """ Report coverage data after the testcases are run """

        if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit:
            if self.coverage_installed:
                with open(os.environ['COVERAGE_PROCESS_START']) as ccf:
                    log.info("Coverage configuration file (%s)" % os.environ.get('COVERAGE_PROCESS_START'))
                    log.info("===========================")
                    log.info("\n%s" % "".join(ccf.readlines()))

                log.info("Coverage Report")
                log.info("===============")
                try:
                    coverage_report()
                finally:
                    # remove the pth file
                    try:
                        os.remove(self.coveragepth)
                    except OSError:
                        log.warn("Expected temporal file from coverage is missing, ignoring removal.")

return StampedResult

class TestRunner(_TestRunner): """Test runner class aware of exporting tests.""" def init(self, *args, **kwargs): try: exportdir = os.path.join(os.getcwd(), log_prefix) kwargsx = dict(**kwargs) # argument specific to XMLTestRunner, if adding a new runner then # also add logic to use other runner's args. kwargsx['output'] = exportdir kwargsx['descriptions'] = False # done for the case where telling the runner where to export super(TestRunner, self).init(*args, **kwargsx) except TypeError: log.info("test runner init'ed like unittest") super(TestRunner, self).init(*args, **kwargs)

if name == "main": try: ret = main() except Exception: ret = 1 import traceback traceback.print_exc() finally: remove_include() remove_inc_files() sys.exit(ret)