mirror of
git://git.yoctoproject.org/poky.git
synced 2025-07-19 21:09:03 +02:00
scripts/oe-selftest: Migrate to new framework into oeqa.selftest.context
The new OEQA framework aims to re-use code into the different Test components. The previous oe-selftest implements it-self loading, run, and list test cases in a non-standard way (unittest base) and other functionalities like logging that is now on oeqa core. This ends on a compact oe-selftest script. All needed command line options was migrated but there are some of them pending of implementation and others deprecated. Deprecated options: list-tags: The tag functionality into the old oeqa framework isn't work, the selftest doesn't has tag decorators. {run, list}-tests-by: Ambiguos options it accepts all the posibilites module, class, name, id or tag. Remaining to implement: coverage: It enables covrage reports over a test run, currently isn't on on use and some bugs [1], i filed a bug to add support to OEQA core module in this way other Test components could enable it. repository: It push XML results into a git repository and isn't in use, i filed a bug to implement this into OEQA core module. [2] [1] https://bugzilla.yoctoproject.org/show_bug.cgi?id=11582#c0 [2] https://bugzilla.yoctoproject.org/show_bug.cgi?id=11583#c0 (From OE-Core rev: 3b2a20eee4a39f40287bf67545839eaa09fc892d) Signed-off-by: Leonardo Sandoval <leonardo.sandoval.gonzalez@linux.intel.com> Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
parent
d09938a608
commit
10c512b60d
224
meta/lib/oeqa/selftest/context.py
Normal file
224
meta/lib/oeqa/selftest/context.py
Normal file
|
@ -0,0 +1,224 @@
|
|||
# Copyright (C) 2017 Intel Corporation
|
||||
# Released under the MIT license (see COPYING.MIT)
|
||||
|
||||
import os
|
||||
import time
|
||||
import glob
|
||||
import sys
|
||||
import imp
|
||||
from random import choice
|
||||
|
||||
import oeqa
|
||||
|
||||
from oeqa.core.context import OETestContext, OETestContextExecutor
|
||||
from oeqa.core.exception import OEQAPreRun
|
||||
|
||||
from oeqa.utils.commands import runCmd, get_bb_vars, get_test_layer
|
||||
|
||||
class OESelftestTestContext(OETestContext):
|
||||
def __init__(self, td=None, logger=None, machines=None, testlayer_path=None):
|
||||
super(OESelftestTestContext, self).__init__(td, logger)
|
||||
|
||||
self.machines = machines
|
||||
self.custommachine = None
|
||||
|
||||
self.testlayer_path = testlayer_path
|
||||
|
||||
def runTests(self, machine=None):
|
||||
if machine:
|
||||
self.custommachine = machine
|
||||
if machine == 'random':
|
||||
self.custommachine = choice(self.machines)
|
||||
self.logger.info('Run tests with custom MACHINE set to: %s' % \
|
||||
self.custommachine)
|
||||
return super(OESelftestTestContext, self).runTests()
|
||||
|
||||
def listTests(self, display_type, machine=None):
|
||||
return super(OESelftestTestContext, self).listTests(display_type)
|
||||
|
||||
class OESelftestTestContextExecutor(OETestContextExecutor):
|
||||
_context_class = OESelftestTestContext
|
||||
_script_executor = 'oe-selftest'
|
||||
|
||||
name = 'oe-selftest'
|
||||
help = 'oe-selftest test component'
|
||||
description = 'Executes selftest tests'
|
||||
|
||||
def register_commands(self, logger, parser):
|
||||
group = parser.add_mutually_exclusive_group(required=True)
|
||||
|
||||
group.add_argument('-a', '--run-all-tests', default=False,
|
||||
action="store_true", dest="run_all_tests",
|
||||
help='Run all (unhidden) tests')
|
||||
group.add_argument('-r', '--run-tests', required=False, action='store',
|
||||
nargs='+', dest="run_tests", default=None,
|
||||
help='Select what tests to run (modules, classes or test methods). Format should be: <module>.<class>.<test_method>')
|
||||
|
||||
group.add_argument('-m', '--list-modules', required=False,
|
||||
action="store_true", default=False,
|
||||
help='List all available test modules.')
|
||||
group.add_argument('--list-classes', required=False,
|
||||
action="store_true", default=False,
|
||||
help='List all available test classes.')
|
||||
group.add_argument('-l', '--list-tests', required=False,
|
||||
action="store_true", default=False,
|
||||
help='List all available tests.')
|
||||
|
||||
parser.add_argument('--machine', required=False, choices=['random', 'all'],
|
||||
help='Run tests on different machines (random/all).')
|
||||
|
||||
parser.set_defaults(func=self.run)
|
||||
|
||||
def _get_available_machines(self):
|
||||
machines = []
|
||||
|
||||
bbpath = self.tc_kwargs['init']['td']['BBPATH'].split(':')
|
||||
|
||||
for path in bbpath:
|
||||
found_machines = glob.glob(os.path.join(path, 'conf', 'machine', '*.conf'))
|
||||
if found_machines:
|
||||
for i in found_machines:
|
||||
# eg: '/home/<user>/poky/meta-intel/conf/machine/intel-core2-32.conf'
|
||||
machines.append(os.path.splitext(os.path.basename(i))[0])
|
||||
|
||||
return machines
|
||||
|
||||
def _get_cases_paths(self, bbpath):
|
||||
cases_paths = []
|
||||
for layer in bbpath:
|
||||
cases_dir = os.path.join(layer, 'lib', 'oeqa', 'selftest', 'cases')
|
||||
if os.path.isdir(cases_dir):
|
||||
cases_paths.append(cases_dir)
|
||||
return cases_paths
|
||||
|
||||
def _process_args(self, logger, args):
|
||||
args.output_log = '%s-results-%s.log' % (self.name,
|
||||
time.strftime("%Y%m%d%H%M%S"))
|
||||
args.test_data_file = None
|
||||
args.CASES_PATHS = None
|
||||
|
||||
super(OESelftestTestContextExecutor, self)._process_args(logger, args)
|
||||
|
||||
if args.list_modules:
|
||||
args.list_tests = 'module'
|
||||
elif args.list_classes:
|
||||
args.list_tests = 'class'
|
||||
elif args.list_tests:
|
||||
args.list_tests = 'name'
|
||||
|
||||
self.tc_kwargs['init']['td'] = get_bb_vars()
|
||||
self.tc_kwargs['init']['machines'] = self._get_available_machines()
|
||||
self.tc_kwargs['init']['testlayer_path'] = get_test_layer()
|
||||
|
||||
def _pre_run(self):
|
||||
def _check_required_env_variables(vars):
|
||||
for var in vars:
|
||||
if not os.environ.get(var):
|
||||
self.tc.logger.error("%s is not set. Did you forget to source your build environment setup script?" % var)
|
||||
raise OEQAPreRun
|
||||
|
||||
def _check_presence_meta_selftest():
|
||||
builddir = os.environ.get("BUILDDIR")
|
||||
if os.getcwd() != builddir:
|
||||
self.tc.logger.info("Changing cwd to %s" % builddir)
|
||||
os.chdir(builddir)
|
||||
|
||||
if not "meta-selftest" in self.tc.td["BBLAYERS"]:
|
||||
self.tc.logger.warn("meta-selftest layer not found in BBLAYERS, adding it")
|
||||
meta_selftestdir = os.path.join(
|
||||
self.tc.td["BBLAYERS_FETCH_DIR"], 'meta-selftest')
|
||||
if os.path.isdir(meta_selftestdir):
|
||||
runCmd("bitbake-layers add-layer %s" %meta_selftestdir)
|
||||
# reload data is needed because a meta-selftest layer was add
|
||||
self.tc.td = get_bb_vars()
|
||||
else:
|
||||
self.tc.logger.error("could not locate meta-selftest in:\n%s" % meta_selftestdir)
|
||||
raise OEQAPreRun
|
||||
|
||||
def _add_layer_libs():
|
||||
bbpath = self.tc.td['BBPATH'].split(':')
|
||||
layer_libdirs = [p for p in (os.path.join(l, 'lib') \
|
||||
for l in bbpath) if os.path.exists(p)]
|
||||
if layer_libdirs:
|
||||
self.tc.logger.info("Adding layer libraries:")
|
||||
for l in layer_libdirs:
|
||||
self.tc.logger.info("\t%s" % l)
|
||||
|
||||
sys.path.extend(layer_libdirs)
|
||||
imp.reload(oeqa.selftest)
|
||||
|
||||
_check_required_env_variables(["BUILDDIR"])
|
||||
_check_presence_meta_selftest()
|
||||
|
||||
if "buildhistory.bbclass" in self.tc.td["BBINCLUDED"]:
|
||||
self.tc.logger.error("You have buildhistory enabled already and this isn't recommended for selftest, please disable it first.")
|
||||
raise OEQAPreRun
|
||||
|
||||
if "PRSERV_HOST" in self.tc.td:
|
||||
self.tc.logger.error("Please unset PRSERV_HOST in order to run oe-selftest")
|
||||
raise OEQAPreRun
|
||||
|
||||
if "SANITY_TESTED_DISTROS" in self.tc.td:
|
||||
self.tc.logger.error("Please unset SANITY_TESTED_DISTROS in order to run oe-selftest")
|
||||
raise OEQAPreRun
|
||||
|
||||
_add_layer_libs()
|
||||
|
||||
self.tc.logger.info("Running bitbake -p")
|
||||
runCmd("bitbake -p")
|
||||
|
||||
def _internal_run(self, logger, args):
|
||||
self.module_paths = self._get_cases_paths(
|
||||
self.tc_kwargs['init']['td']['BBPATH'].split(':'))
|
||||
|
||||
self.tc = self._context_class(**self.tc_kwargs['init'])
|
||||
self.tc.loadTests(self.module_paths, **self.tc_kwargs['load'])
|
||||
|
||||
if args.list_tests:
|
||||
rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['run'])
|
||||
else:
|
||||
self._pre_run()
|
||||
rc = self.tc.runTests(**self.tc_kwargs['run'])
|
||||
rc.logSummary(self.name)
|
||||
rc.logDetails()
|
||||
|
||||
return rc
|
||||
|
||||
def run(self, logger, args):
|
||||
self._process_args(logger, args)
|
||||
rc = None
|
||||
|
||||
if args.machine:
|
||||
logger.info('Custom machine mode enabled. MACHINE set to %s' %
|
||||
args.machine)
|
||||
|
||||
if args.machine == 'all':
|
||||
results = []
|
||||
for m in self.tc_kwargs['init']['machines']:
|
||||
self.tc_kwargs['run']['machine'] = m
|
||||
results.append(self._internal_run(logger, args))
|
||||
|
||||
# XXX: the oe-selftest script only needs to know if one
|
||||
# machine run fails
|
||||
for r in results:
|
||||
rc = r
|
||||
if not r.wasSuccessful():
|
||||
break
|
||||
|
||||
else:
|
||||
self.tc_kwargs['run']['machine'] = args.machine
|
||||
return self._internal_run(logger, args)
|
||||
|
||||
else:
|
||||
self.tc_kwargs['run']['machine'] = args.machine
|
||||
rc = self._internal_run(logger, args)
|
||||
|
||||
output_link = os.path.join(os.path.dirname(args.output_log),
|
||||
"%s-results.log" % self.name)
|
||||
if os.path.exists(output_link):
|
||||
os.remove(output_link)
|
||||
os.symlink(args.output_log, output_link)
|
||||
|
||||
return rc
|
||||
|
||||
_executor_class = OESelftestTestContextExecutor
|
|
@ -1,6 +1,6 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
# Copyright (c) 2013 Intel Corporation
|
||||
# Copyright (c) 2013-2017 Intel Corporation
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License version 2 as
|
||||
|
@ -25,732 +25,47 @@
|
|||
# E.g: "oe-selftest -r bblayers.BitbakeLayers" will run just the BitbakeLayers class from meta/lib/oeqa/selftest/bblayers.py
|
||||
|
||||
|
||||
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
import logging
|
||||
import argparse
|
||||
import subprocess
|
||||
import time as t
|
||||
import re
|
||||
import fnmatch
|
||||
import collections
|
||||
import imp
|
||||
import logging
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/lib')
|
||||
import scriptpath
|
||||
scriptpath.add_bitbake_lib_path()
|
||||
scriptpath.add_oe_lib_path()
|
||||
scripts_path = os.path.dirname(os.path.realpath(__file__))
|
||||
lib_path = scripts_path + '/lib'
|
||||
sys.path = sys.path + [lib_path]
|
||||
import argparse_oe
|
||||
import scriptutils
|
||||
import scriptpath
|
||||
scriptpath.add_oe_lib_path()
|
||||
scriptpath.add_bitbake_lib_path()
|
||||
|
||||
import oeqa.selftest
|
||||
import oeqa.utils.ftools as ftools
|
||||
from oeqa.utils.commands import runCmd, get_bb_var, get_test_layer
|
||||
from oeqa.utils.metadata import metadata_from_bb, write_metadata_file
|
||||
from oeqa.selftest.base import oeSelfTest, get_available_machines
|
||||
|
||||
try:
|
||||
import xmlrunner
|
||||
from xmlrunner.result import _XMLTestResult as TestResult
|
||||
from xmlrunner import XMLTestRunner as _TestRunner
|
||||
except ImportError:
|
||||
# use the base runner instead
|
||||
from unittest import TextTestResult as TestResult
|
||||
from unittest import TextTestRunner as _TestRunner
|
||||
|
||||
log_prefix = "oe-selftest-" + t.strftime("%Y%m%d-%H%M%S")
|
||||
|
||||
def logger_create():
|
||||
log_file = log_prefix + ".log"
|
||||
if os.path.lexists("oe-selftest.log"):
|
||||
os.remove("oe-selftest.log")
|
||||
os.symlink(log_file, "oe-selftest.log")
|
||||
|
||||
log = logging.getLogger("selftest")
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
||||
fh = logging.FileHandler(filename=log_file, mode='w')
|
||||
fh.setLevel(logging.DEBUG)
|
||||
|
||||
ch = logging.StreamHandler(sys.stdout)
|
||||
ch.setLevel(logging.INFO)
|
||||
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
fh.setFormatter(formatter)
|
||||
ch.setFormatter(formatter)
|
||||
|
||||
log.addHandler(fh)
|
||||
log.addHandler(ch)
|
||||
|
||||
return log
|
||||
|
||||
log = logger_create()
|
||||
|
||||
def get_args_parser():
|
||||
description = "Script that runs unit tests against bitbake and other Yocto related tools. The goal is to validate tools functionality and metadata integrity. Refer to https://wiki.yoctoproject.org/wiki/Oe-selftest for more information."
|
||||
parser = argparse_oe.ArgumentParser(description=description)
|
||||
group = parser.add_mutually_exclusive_group(required=True)
|
||||
group.add_argument('-r', '--run-tests', required=False, action='store', nargs='*', dest="run_tests", default=None, help='Select what tests to run (modules, classes or test methods). Format should be: <module>.<class>.<test_method>')
|
||||
group.add_argument('-a', '--run-all-tests', required=False, action="store_true", dest="run_all_tests", default=False, help='Run all (unhidden) tests')
|
||||
group.add_argument('-m', '--list-modules', required=False, action="store_true", dest="list_modules", default=False, help='List all available test modules.')
|
||||
group.add_argument('--list-classes', required=False, action="store_true", dest="list_allclasses", default=False, help='List all available test classes.')
|
||||
parser.add_argument('--coverage', action="store_true", help="Run code coverage when testing")
|
||||
parser.add_argument('--coverage-source', dest="coverage_source", nargs="+", help="Specifiy the directories to take coverage from")
|
||||
parser.add_argument('--coverage-include', dest="coverage_include", nargs="+", help="Specify extra patterns to include into the coverage measurement")
|
||||
parser.add_argument('--coverage-omit', dest="coverage_omit", nargs="+", help="Specify with extra patterns to exclude from the coverage measurement")
|
||||
group.add_argument('--run-tests-by', required=False, dest='run_tests_by', default=False, nargs='*',
|
||||
help='run-tests-by <name|class|module|id|tag> <list of tests|classes|modules|ids|tags>')
|
||||
group.add_argument('--list-tests-by', required=False, dest='list_tests_by', default=False, nargs='*',
|
||||
help='list-tests-by <name|class|module|id|tag> <list of tests|classes|modules|ids|tags>')
|
||||
group.add_argument('-l', '--list-tests', required=False, action="store_true", dest="list_tests", default=False,
|
||||
help='List all available tests.')
|
||||
group.add_argument('--list-tags', required=False, dest='list_tags', default=False, action="store_true",
|
||||
help='List all tags that have been set to test cases.')
|
||||
parser.add_argument('--machine', required=False, dest='machine', choices=['random', 'all'], default=None,
|
||||
help='Run tests on different machines (random/all).')
|
||||
parser.add_argument('--repository', required=False, dest='repository', default='', action='store',
|
||||
help='Submit test results to a repository')
|
||||
return parser
|
||||
|
||||
builddir = None
|
||||
|
||||
|
||||
def preflight_check():
|
||||
|
||||
global builddir
|
||||
|
||||
log.info("Checking that everything is in order before running the tests")
|
||||
|
||||
if not os.environ.get("BUILDDIR"):
|
||||
log.error("BUILDDIR isn't set. Did you forget to source your build environment setup script?")
|
||||
return False
|
||||
|
||||
builddir = os.environ.get("BUILDDIR")
|
||||
if os.getcwd() != builddir:
|
||||
log.info("Changing cwd to %s" % builddir)
|
||||
os.chdir(builddir)
|
||||
|
||||
if not "meta-selftest" in get_bb_var("BBLAYERS"):
|
||||
log.warn("meta-selftest layer not found in BBLAYERS, adding it")
|
||||
meta_selftestdir = os.path.join(
|
||||
get_bb_var("BBLAYERS_FETCH_DIR"),
|
||||
'meta-selftest')
|
||||
if os.path.isdir(meta_selftestdir):
|
||||
runCmd("bitbake-layers add-layer %s" %meta_selftestdir)
|
||||
else:
|
||||
log.error("could not locate meta-selftest in:\n%s"
|
||||
%meta_selftestdir)
|
||||
return False
|
||||
|
||||
if "buildhistory.bbclass" in get_bb_var("BBINCLUDED"):
|
||||
log.error("You have buildhistory enabled already and this isn't recommended for selftest, please disable it first.")
|
||||
return False
|
||||
|
||||
if get_bb_var("PRSERV_HOST"):
|
||||
log.error("Please unset PRSERV_HOST in order to run oe-selftest")
|
||||
return False
|
||||
|
||||
if get_bb_var("SANITY_TESTED_DISTROS"):
|
||||
log.error("Please unset SANITY_TESTED_DISTROS in order to run oe-selftest")
|
||||
return False
|
||||
|
||||
log.info("Running bitbake -p")
|
||||
runCmd("bitbake -p")
|
||||
|
||||
return True
|
||||
|
||||
def get_tests_modules(include_hidden=False):
|
||||
modules_list = list()
|
||||
for modules_path in oeqa.selftest.__path__:
|
||||
for (p, d, f) in os.walk(modules_path):
|
||||
files = sorted([f for f in os.listdir(p) if f.endswith('.py') and not (f.startswith('_') and not include_hidden) and not f.startswith('__') and f != 'base.py'])
|
||||
for f in files:
|
||||
submodules = p.split("selftest")[-1]
|
||||
module = ""
|
||||
if submodules:
|
||||
module = 'oeqa.selftest' + submodules.replace("/",".") + "." + f.split('.py')[0]
|
||||
else:
|
||||
module = 'oeqa.selftest.' + f.split('.py')[0]
|
||||
if module not in modules_list:
|
||||
modules_list.append(module)
|
||||
return modules_list
|
||||
|
||||
|
||||
def get_tests(exclusive_modules=[], include_hidden=False):
|
||||
test_modules = list()
|
||||
for x in exclusive_modules:
|
||||
test_modules.append('oeqa.selftest.' + x)
|
||||
if not test_modules:
|
||||
inc_hidden = include_hidden
|
||||
test_modules = get_tests_modules(inc_hidden)
|
||||
|
||||
return test_modules
|
||||
|
||||
|
||||
class Tc:
|
||||
def __init__(self, tcname, tcclass, tcmodule, tcid=None, tctag=None):
|
||||
self.tcname = tcname
|
||||
self.tcclass = tcclass
|
||||
self.tcmodule = tcmodule
|
||||
self.tcid = tcid
|
||||
# A test case can have multiple tags (as tuples) otherwise str will suffice
|
||||
self.tctag = tctag
|
||||
self.fullpath = '.'.join(['oeqa', 'selftest', tcmodule, tcclass, tcname])
|
||||
|
||||
|
||||
def get_tests_from_module(tmod):
|
||||
tlist = []
|
||||
prefix = 'oeqa.selftest.'
|
||||
|
||||
try:
|
||||
import importlib
|
||||
modlib = importlib.import_module(tmod)
|
||||
for mod in list(vars(modlib).values()):
|
||||
if isinstance(mod, type(oeSelfTest)) and issubclass(mod, oeSelfTest) and mod is not oeSelfTest:
|
||||
for test in dir(mod):
|
||||
if test.startswith('test_') and hasattr(vars(mod)[test], '__call__'):
|
||||
# Get test case id and feature tag
|
||||
# NOTE: if testcase decorator or feature tag not set will throw error
|
||||
try:
|
||||
tid = vars(mod)[test].test_case
|
||||
except:
|
||||
print('DEBUG: tc id missing for ' + str(test))
|
||||
tid = None
|
||||
try:
|
||||
ttag = vars(mod)[test].tag__feature
|
||||
except:
|
||||
# print('DEBUG: feature tag missing for ' + str(test))
|
||||
ttag = None
|
||||
|
||||
# NOTE: for some reason lstrip() doesn't work for mod.__module__
|
||||
tlist.append(Tc(test, mod.__name__, mod.__module__.replace(prefix, ''), tid, ttag))
|
||||
except:
|
||||
pass
|
||||
|
||||
return tlist
|
||||
|
||||
|
||||
def get_all_tests():
|
||||
# Get all the test modules (except the hidden ones)
|
||||
testlist = []
|
||||
tests_modules = get_tests_modules()
|
||||
# Get all the tests from modules
|
||||
for tmod in sorted(tests_modules):
|
||||
testlist += get_tests_from_module(tmod)
|
||||
return testlist
|
||||
|
||||
|
||||
def get_testsuite_by(criteria, keyword):
|
||||
# Get a testsuite based on 'keyword'
|
||||
# criteria: name, class, module, id, tag
|
||||
# keyword: a list of tests, classes, modules, ids, tags
|
||||
|
||||
ts = []
|
||||
all_tests = get_all_tests()
|
||||
|
||||
def get_matches(values):
|
||||
# Get an item and return the ones that match with keyword(s)
|
||||
# values: the list of items (names, modules, classes...)
|
||||
result = []
|
||||
remaining = values[:]
|
||||
for key in keyword:
|
||||
found = False
|
||||
if key in remaining:
|
||||
# Regular matching of exact item
|
||||
result.append(key)
|
||||
remaining.remove(key)
|
||||
found = True
|
||||
else:
|
||||
# Wildcard matching
|
||||
pattern = re.compile(fnmatch.translate(r"%s" % key))
|
||||
added = [x for x in remaining if pattern.match(x)]
|
||||
if added:
|
||||
result.extend(added)
|
||||
remaining = [x for x in remaining if x not in added]
|
||||
found = True
|
||||
if not found:
|
||||
log.error("Failed to find test: %s" % key)
|
||||
|
||||
return result
|
||||
|
||||
if criteria == 'name':
|
||||
names = get_matches([ tc.tcname for tc in all_tests ])
|
||||
ts = [ tc for tc in all_tests if tc.tcname in names ]
|
||||
|
||||
elif criteria == 'class':
|
||||
classes = get_matches([ tc.tcclass for tc in all_tests ])
|
||||
ts = [ tc for tc in all_tests if tc.tcclass in classes ]
|
||||
|
||||
elif criteria == 'module':
|
||||
modules = get_matches([ tc.tcmodule for tc in all_tests ])
|
||||
ts = [ tc for tc in all_tests if tc.tcmodule in modules ]
|
||||
|
||||
elif criteria == 'id':
|
||||
ids = get_matches([ str(tc.tcid) for tc in all_tests ])
|
||||
ts = [ tc for tc in all_tests if str(tc.tcid) in ids ]
|
||||
|
||||
elif criteria == 'tag':
|
||||
values = set()
|
||||
for tc in all_tests:
|
||||
# tc can have multiple tags (as tuple) otherwise str will suffice
|
||||
if isinstance(tc.tctag, tuple):
|
||||
values |= { str(tag) for tag in tc.tctag }
|
||||
else:
|
||||
values.add(str(tc.tctag))
|
||||
|
||||
tags = get_matches(list(values))
|
||||
|
||||
for tc in all_tests:
|
||||
for tag in tags:
|
||||
if isinstance(tc.tctag, tuple) and tag in tc.tctag:
|
||||
ts.append(tc)
|
||||
elif tag == tc.tctag:
|
||||
ts.append(tc)
|
||||
|
||||
# Remove duplicates from the list
|
||||
ts = list(set(ts))
|
||||
|
||||
return ts
|
||||
|
||||
|
||||
def list_testsuite_by(criteria, keyword):
|
||||
# Get a testsuite based on 'keyword'
|
||||
# criteria: name, class, module, id, tag
|
||||
# keyword: a list of tests, classes, modules, ids, tags
|
||||
def tc_key(t):
|
||||
if t[0] is None:
|
||||
return (0,) + t[1:]
|
||||
return t
|
||||
# tcid may be None if no ID was assigned, in which case sorted() will throw
|
||||
# a TypeError as Python 3 does not allow comparison (<,<=,>=,>) of
|
||||
# heterogeneous types, handle this by using a custom key generator
|
||||
ts = sorted([ (tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule) \
|
||||
for tc in get_testsuite_by(criteria, keyword) ], key=tc_key)
|
||||
print('_' * 150)
|
||||
for t in ts:
|
||||
if isinstance(t[1], (tuple, list)):
|
||||
print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % (t[0], ', '.join(t[1]), t[2], t[3], t[4]))
|
||||
else:
|
||||
print('%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % t)
|
||||
print('_' * 150)
|
||||
print('Filtering by:\t %s' % criteria)
|
||||
print('Looking for:\t %s' % ', '.join(str(x) for x in keyword))
|
||||
print('Total found:\t %s' % len(ts))
|
||||
|
||||
|
||||
def list_tests():
|
||||
# List all available oe-selftest tests
|
||||
|
||||
ts = get_all_tests()
|
||||
|
||||
print('%-4s\t%-10s\t%-50s' % ('id', 'tag', 'test'))
|
||||
print('_' * 80)
|
||||
for t in ts:
|
||||
if isinstance(t.tctag, (tuple, list)):
|
||||
print('%-4s\t%-10s\t%-50s' % (t.tcid, ', '.join(t.tctag), '.'.join([t.tcmodule, t.tcclass, t.tcname])))
|
||||
else:
|
||||
print('%-4s\t%-10s\t%-50s' % (t.tcid, t.tctag, '.'.join([t.tcmodule, t.tcclass, t.tcname])))
|
||||
print('_' * 80)
|
||||
print('Total found:\t %s' % len(ts))
|
||||
|
||||
def list_tags():
|
||||
# Get all tags set to test cases
|
||||
# This is useful when setting tags to test cases
|
||||
# The list of tags should be kept as minimal as possible
|
||||
tags = set()
|
||||
all_tests = get_all_tests()
|
||||
|
||||
for tc in all_tests:
|
||||
if isinstance(tc.tctag, (tuple, list)):
|
||||
tags.update(set(tc.tctag))
|
||||
else:
|
||||
tags.add(tc.tctag)
|
||||
|
||||
print('Tags:\t%s' % ', '.join(str(x) for x in tags))
|
||||
|
||||
def coverage_setup(coverage_source, coverage_include, coverage_omit):
|
||||
""" Set up the coverage measurement for the testcases to be run """
|
||||
import datetime
|
||||
import subprocess
|
||||
global builddir
|
||||
pokydir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
|
||||
curcommit= subprocess.check_output(["git", "--git-dir", os.path.join(pokydir, ".git"), "rev-parse", "HEAD"]).decode('utf-8')
|
||||
coveragerc = "%s/.coveragerc" % builddir
|
||||
data_file = "%s/.coverage." % builddir
|
||||
data_file += datetime.datetime.now().strftime('%Y%m%dT%H%M%S')
|
||||
if os.path.isfile(data_file):
|
||||
os.remove(data_file)
|
||||
with open(coveragerc, 'w') as cps:
|
||||
cps.write("# Generated with command '%s'\n" % " ".join(sys.argv))
|
||||
cps.write("# HEAD commit %s\n" % curcommit.strip())
|
||||
cps.write("[run]\n")
|
||||
cps.write("data_file = %s\n" % data_file)
|
||||
cps.write("branch = True\n")
|
||||
# Measure just BBLAYERS, scripts and bitbake folders
|
||||
cps.write("source = \n")
|
||||
if coverage_source:
|
||||
for directory in coverage_source:
|
||||
if not os.path.isdir(directory):
|
||||
log.warn("Directory %s is not valid.", directory)
|
||||
cps.write(" %s\n" % directory)
|
||||
else:
|
||||
for layer in get_bb_var('BBLAYERS').split():
|
||||
cps.write(" %s\n" % layer)
|
||||
cps.write(" %s\n" % os.path.dirname(os.path.realpath(__file__)))
|
||||
cps.write(" %s\n" % os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))),'bitbake'))
|
||||
|
||||
if coverage_include:
|
||||
cps.write("include = \n")
|
||||
for pattern in coverage_include:
|
||||
cps.write(" %s\n" % pattern)
|
||||
if coverage_omit:
|
||||
cps.write("omit = \n")
|
||||
for pattern in coverage_omit:
|
||||
cps.write(" %s\n" % pattern)
|
||||
|
||||
return coveragerc
|
||||
|
||||
def coverage_report():
|
||||
""" Loads the coverage data gathered and reports it back """
|
||||
try:
|
||||
# Coverage4 uses coverage.Coverage
|
||||
from coverage import Coverage
|
||||
except:
|
||||
# Coverage under version 4 uses coverage.coverage
|
||||
from coverage import coverage as Coverage
|
||||
|
||||
import io as StringIO
|
||||
from coverage.misc import CoverageException
|
||||
|
||||
cov_output = StringIO.StringIO()
|
||||
# Creating the coverage data with the setting from the configuration file
|
||||
cov = Coverage(config_file = os.environ.get('COVERAGE_PROCESS_START'))
|
||||
try:
|
||||
# Load data from the data file specified in the configuration
|
||||
cov.load()
|
||||
# Store report data in a StringIO variable
|
||||
cov.report(file = cov_output, show_missing=False)
|
||||
log.info("\n%s" % cov_output.getvalue())
|
||||
except CoverageException as e:
|
||||
# Show problems with the reporting. Since Coverage4 not finding any data to report raises an exception
|
||||
log.warn("%s" % str(e))
|
||||
finally:
|
||||
cov_output.close()
|
||||
from oeqa.utils import load_test_components
|
||||
from oeqa.core.exception import OEQAPreRun
|
||||
|
||||
logger = scriptutils.logger_create('oe-selftest')
|
||||
|
||||
def main():
|
||||
parser = get_args_parser()
|
||||
args = parser.parse_args()
|
||||
description = "Script that runs unit tests against bitbake and other Yocto related tools. The goal is to validate tools functionality and metadata integrity. Refer to https://wiki.yoctoproject.org/wiki/Oe-selftest for more information."
|
||||
parser = argparse_oe.ArgumentParser(description=description)
|
||||
|
||||
# Add <layer>/lib to sys.path, so layers can add selftests
|
||||
log.info("Running bitbake -e to get BBPATH")
|
||||
bbpath = get_bb_var('BBPATH').split(':')
|
||||
layer_libdirs = [p for p in (os.path.join(l, 'lib') for l in bbpath) if os.path.exists(p)]
|
||||
sys.path.extend(layer_libdirs)
|
||||
imp.reload(oeqa.selftest)
|
||||
comp_name, comp = load_test_components(logger, 'oe-selftest').popitem()
|
||||
comp.register_commands(logger, parser)
|
||||
|
||||
# act like bitbake and enforce en_US.UTF-8 locale
|
||||
os.environ["LC_ALL"] = "en_US.UTF-8"
|
||||
try:
|
||||
args = parser.parse_args()
|
||||
results = args.func(logger, args)
|
||||
ret = 0 if results.wasSuccessful() else 1
|
||||
except SystemExit as err:
|
||||
if err.code != 0:
|
||||
raise err
|
||||
ret = err.code
|
||||
except OEQAPreRun as pr:
|
||||
ret = 1
|
||||
|
||||
if args.run_tests_by and len(args.run_tests_by) >= 2:
|
||||
valid_options = ['name', 'class', 'module', 'id', 'tag']
|
||||
if args.run_tests_by[0] not in valid_options:
|
||||
print('--run-tests-by %s not a valid option. Choose one of <name|class|module|id|tag>.' % args.run_tests_by[0])
|
||||
return 1
|
||||
else:
|
||||
criteria = args.run_tests_by[0]
|
||||
keyword = args.run_tests_by[1:]
|
||||
ts = sorted([ tc.fullpath for tc in get_testsuite_by(criteria, keyword) ])
|
||||
if not ts:
|
||||
return 1
|
||||
return ret
|
||||
|
||||
if args.list_tests_by and len(args.list_tests_by) >= 2:
|
||||
valid_options = ['name', 'class', 'module', 'id', 'tag']
|
||||
if args.list_tests_by[0] not in valid_options:
|
||||
print('--list-tests-by %s not a valid option. Choose one of <name|class|module|id|tag>.' % args.list_tests_by[0])
|
||||
return 1
|
||||
else:
|
||||
criteria = args.list_tests_by[0]
|
||||
keyword = args.list_tests_by[1:]
|
||||
list_testsuite_by(criteria, keyword)
|
||||
|
||||
if args.list_tests:
|
||||
list_tests()
|
||||
|
||||
if args.list_tags:
|
||||
list_tags()
|
||||
|
||||
if args.list_allclasses:
|
||||
args.list_modules = True
|
||||
|
||||
if args.list_modules:
|
||||
log.info('Listing all available test modules:')
|
||||
testslist = get_tests(include_hidden=True)
|
||||
for test in testslist:
|
||||
module = test.split('oeqa.selftest.')[-1]
|
||||
info = ''
|
||||
if module.startswith('_'):
|
||||
info = ' (hidden)'
|
||||
print(module + info)
|
||||
if args.list_allclasses:
|
||||
try:
|
||||
import importlib
|
||||
modlib = importlib.import_module(test)
|
||||
for v in vars(modlib):
|
||||
t = vars(modlib)[v]
|
||||
if isinstance(t, type(oeSelfTest)) and issubclass(t, oeSelfTest) and t!=oeSelfTest:
|
||||
print(" --", v)
|
||||
for method in dir(t):
|
||||
if method.startswith("test_") and isinstance(vars(t)[method], collections.Callable):
|
||||
print(" -- --", method)
|
||||
|
||||
except (AttributeError, ImportError) as e:
|
||||
print(e)
|
||||
pass
|
||||
|
||||
if args.run_tests or args.run_all_tests or args.run_tests_by:
|
||||
if not preflight_check():
|
||||
return 1
|
||||
|
||||
if args.run_tests_by:
|
||||
testslist = ts
|
||||
else:
|
||||
testslist = get_tests(exclusive_modules=(args.run_tests or []), include_hidden=False)
|
||||
|
||||
suite = unittest.TestSuite()
|
||||
loader = unittest.TestLoader()
|
||||
loader.sortTestMethodsUsing = None
|
||||
runner = TestRunner(verbosity=2,
|
||||
resultclass=buildResultClass(args))
|
||||
# we need to do this here, otherwise just loading the tests
|
||||
# will take 2 minutes (bitbake -e calls)
|
||||
oeSelfTest.testlayer_path = get_test_layer()
|
||||
for test in testslist:
|
||||
log.info("Loading tests from: %s" % test)
|
||||
try:
|
||||
suite.addTests(loader.loadTestsFromName(test))
|
||||
except AttributeError as e:
|
||||
log.error("Failed to import %s" % test)
|
||||
log.error(e)
|
||||
return 1
|
||||
|
||||
if args.machine:
|
||||
# Custom machine sets only weak default values (??=) for MACHINE in machine.inc
|
||||
# This let test cases that require a specific MACHINE to be able to override it, using (?= or =)
|
||||
log.info('Custom machine mode enabled. MACHINE set to %s' % args.machine)
|
||||
if args.machine == 'random':
|
||||
os.environ['CUSTOMMACHINE'] = 'random'
|
||||
result = runner.run(suite)
|
||||
else: # all
|
||||
machines = get_available_machines()
|
||||
for m in machines:
|
||||
log.info('Run tests with custom MACHINE set to: %s' % m)
|
||||
os.environ['CUSTOMMACHINE'] = m
|
||||
result = runner.run(suite)
|
||||
else:
|
||||
result = runner.run(suite)
|
||||
|
||||
log.info("Finished")
|
||||
|
||||
if args.repository:
|
||||
import git
|
||||
# Commit tests results to repository
|
||||
metadata = metadata_from_bb()
|
||||
git_dir = os.path.join(os.getcwd(), 'selftest')
|
||||
if not os.path.isdir(git_dir):
|
||||
os.mkdir(git_dir)
|
||||
|
||||
log.debug('Checking for git repository in %s' % git_dir)
|
||||
try:
|
||||
repo = git.Repo(git_dir)
|
||||
except git.exc.InvalidGitRepositoryError:
|
||||
log.debug("Couldn't find git repository %s; "
|
||||
"cloning from %s" % (git_dir, args.repository))
|
||||
repo = git.Repo.clone_from(args.repository, git_dir)
|
||||
|
||||
r_branches = repo.git.branch(r=True)
|
||||
r_branches = set(r_branches.replace('origin/', '').split())
|
||||
l_branches = {str(branch) for branch in repo.branches}
|
||||
branch = '%s/%s/%s' % (metadata['hostname'],
|
||||
metadata['layers']['meta'].get('branch', '(nogit)'),
|
||||
metadata['config']['MACHINE'])
|
||||
|
||||
if branch in l_branches:
|
||||
log.debug('Found branch in local repository, checking out')
|
||||
repo.git.checkout(branch)
|
||||
elif branch in r_branches:
|
||||
log.debug('Found branch in remote repository, checking'
|
||||
' out and pulling')
|
||||
repo.git.checkout(branch)
|
||||
repo.git.pull()
|
||||
else:
|
||||
log.debug('New branch %s' % branch)
|
||||
repo.git.checkout('master')
|
||||
repo.git.checkout(b=branch)
|
||||
|
||||
cleanResultsDir(repo)
|
||||
xml_dir = os.path.join(os.getcwd(), log_prefix)
|
||||
copyResultFiles(xml_dir, git_dir, repo)
|
||||
metadata_file = os.path.join(git_dir, 'metadata.xml')
|
||||
write_metadata_file(metadata_file, metadata)
|
||||
repo.index.add([metadata_file])
|
||||
repo.index.write()
|
||||
|
||||
# Get information for commit message
|
||||
layer_info = ''
|
||||
for layer, values in metadata['layers'].items():
|
||||
layer_info = '%s%-17s = %s:%s\n' % (layer_info, layer,
|
||||
values.get('branch', '(nogit)'), values.get('commit', '0'*40))
|
||||
msg = 'Selftest for build %s of %s for machine %s on %s\n\n%s' % (
|
||||
log_prefix[12:], metadata['distro']['pretty_name'],
|
||||
metadata['config']['MACHINE'], metadata['hostname'], layer_info)
|
||||
|
||||
log.debug('Commiting results to local repository')
|
||||
repo.index.commit(msg)
|
||||
if not repo.is_dirty():
|
||||
try:
|
||||
if branch in r_branches:
|
||||
log.debug('Pushing changes to remote repository')
|
||||
repo.git.push()
|
||||
else:
|
||||
log.debug('Pushing changes to remote repository '
|
||||
'creating new branch')
|
||||
repo.git.push('-u', 'origin', branch)
|
||||
except GitCommandError:
|
||||
log.error('Falied to push to remote repository')
|
||||
return 1
|
||||
else:
|
||||
log.error('Local repository is dirty, not pushing commits')
|
||||
|
||||
if result.wasSuccessful():
|
||||
return 0
|
||||
else:
|
||||
return 1
|
||||
|
||||
def buildResultClass(args):
|
||||
"""Build a Result Class to use in the testcase execution"""
|
||||
import site
|
||||
|
||||
class StampedResult(TestResult):
|
||||
"""
|
||||
Custom TestResult that prints the time when a test starts. As oe-selftest
|
||||
can take a long time (ie a few hours) to run, timestamps help us understand
|
||||
what tests are taking a long time to execute.
|
||||
If coverage is required, this class executes the coverage setup and reporting.
|
||||
"""
|
||||
def startTest(self, test):
|
||||
import time
|
||||
self.stream.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " - ")
|
||||
super(StampedResult, self).startTest(test)
|
||||
|
||||
def startTestRun(self):
|
||||
""" Setup coverage before running any testcase """
|
||||
|
||||
# variable holding the coverage configuration file allowing subprocess to be measured
|
||||
self.coveragepth = None
|
||||
|
||||
# indicates the system if coverage is currently installed
|
||||
self.coverage_installed = True
|
||||
|
||||
if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit:
|
||||
try:
|
||||
# check if user can do coverage
|
||||
import coverage
|
||||
except:
|
||||
log.warn("python coverage is not installed. More info on https://pypi.python.org/pypi/coverage")
|
||||
self.coverage_installed = False
|
||||
|
||||
if self.coverage_installed:
|
||||
log.info("Coverage is enabled")
|
||||
|
||||
major_version = int(coverage.version.__version__[0])
|
||||
if major_version < 4:
|
||||
log.error("python coverage %s installed. Require version 4 or greater." % coverage.version.__version__)
|
||||
self.stop()
|
||||
# In case the user has not set the variable COVERAGE_PROCESS_START,
|
||||
# create a default one and export it. The COVERAGE_PROCESS_START
|
||||
# value indicates where the coverage configuration file resides
|
||||
# More info on https://pypi.python.org/pypi/coverage
|
||||
if not os.environ.get('COVERAGE_PROCESS_START'):
|
||||
os.environ['COVERAGE_PROCESS_START'] = coverage_setup(args.coverage_source, args.coverage_include, args.coverage_omit)
|
||||
|
||||
# Use default site.USER_SITE and write corresponding config file
|
||||
site.ENABLE_USER_SITE = True
|
||||
if not os.path.exists(site.USER_SITE):
|
||||
os.makedirs(site.USER_SITE)
|
||||
self.coveragepth = os.path.join(site.USER_SITE, "coverage.pth")
|
||||
with open(self.coveragepth, 'w') as cps:
|
||||
cps.write('import sys,site; sys.path.extend(site.getsitepackages()); import coverage; coverage.process_startup();')
|
||||
|
||||
def stopTestRun(self):
|
||||
""" Report coverage data after the testcases are run """
|
||||
|
||||
if args.coverage or args.coverage_source or args.coverage_include or args.coverage_omit:
|
||||
if self.coverage_installed:
|
||||
with open(os.environ['COVERAGE_PROCESS_START']) as ccf:
|
||||
log.info("Coverage configuration file (%s)" % os.environ.get('COVERAGE_PROCESS_START'))
|
||||
log.info("===========================")
|
||||
log.info("\n%s" % "".join(ccf.readlines()))
|
||||
|
||||
log.info("Coverage Report")
|
||||
log.info("===============")
|
||||
try:
|
||||
coverage_report()
|
||||
finally:
|
||||
# remove the pth file
|
||||
try:
|
||||
os.remove(self.coveragepth)
|
||||
except OSError:
|
||||
log.warn("Expected temporal file from coverage is missing, ignoring removal.")
|
||||
|
||||
return StampedResult
|
||||
|
||||
def cleanResultsDir(repo):
|
||||
""" Remove result files from directory """
|
||||
|
||||
xml_files = []
|
||||
directory = repo.working_tree_dir
|
||||
for f in os.listdir(directory):
|
||||
path = os.path.join(directory, f)
|
||||
if os.path.isfile(path) and path.endswith('.xml'):
|
||||
xml_files.append(f)
|
||||
repo.index.remove(xml_files, working_tree=True)
|
||||
|
||||
def copyResultFiles(src, dst, repo):
|
||||
""" Copy result files from src to dst removing the time stamp. """
|
||||
|
||||
import shutil
|
||||
|
||||
re_time = re.compile("-[0-9]+")
|
||||
file_list = []
|
||||
|
||||
for root, subdirs, files in os.walk(src):
|
||||
tmp_dir = root.replace(src, '').lstrip('/')
|
||||
for s in subdirs:
|
||||
os.mkdir(os.path.join(dst, tmp_dir, s))
|
||||
for f in files:
|
||||
file_name = os.path.join(dst, tmp_dir, re_time.sub("", f))
|
||||
shutil.copy2(os.path.join(root, f), file_name)
|
||||
file_list.append(file_name)
|
||||
repo.index.add(file_list)
|
||||
|
||||
class TestRunner(_TestRunner):
|
||||
"""Test runner class aware of exporting tests."""
|
||||
def __init__(self, *args, **kwargs):
|
||||
try:
|
||||
exportdir = os.path.join(os.getcwd(), log_prefix)
|
||||
kwargsx = dict(**kwargs)
|
||||
# argument specific to XMLTestRunner, if adding a new runner then
|
||||
# also add logic to use other runner's args.
|
||||
kwargsx['output'] = exportdir
|
||||
kwargsx['descriptions'] = False
|
||||
# done for the case where telling the runner where to export
|
||||
super(TestRunner, self).__init__(*args, **kwargsx)
|
||||
except TypeError:
|
||||
log.info("test runner init'ed like unittest")
|
||||
super(TestRunner, self).__init__(*args, **kwargs)
|
||||
|
||||
if __name__ == "__main__":
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
ret = main()
|
||||
except Exception:
|
||||
|
|
Loading…
Reference in New Issue
Block a user