patchtest: mbox.py: new data implementation

Consolidate and improve some objects:

- absorb utils.py functionality
- repo.py: use mbox.py
- repo.py: remove some cruft
- utils.py: replace with logs.py
- utils.py: delete
- patch.py: delete
- scripts/patchtest: use logging directly
- general cleanup

(From OE-Core rev: d4fbdb1d15f281b236137d63710c73bca8911a36)

Signed-off-by: Trevor Gamblin <tgamblin@baylibre.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Trevor Gamblin 2024-09-24 07:54:59 -04:00 committed by Richard Purdie
parent bb0f1625d7
commit d6ede9c73b
8 changed files with 135 additions and 169 deletions

108
meta/lib/patchtest/mbox.py Normal file
View File

@ -0,0 +1,108 @@
#! /usr/bin/env python3
# series.py
#
# Read a series' mbox file and get information about the patches
# contained
#
# Copyright (C) 2024 BayLibre SAS
#
# SPDX-License-Identifier: GPL-2.0-only
#
import email
import re
# From: https://stackoverflow.com/questions/59681461/read-a-big-mbox-file-with-python
class MboxReader:
def __init__(self, filepath):
self.handle = open(filepath, 'rb')
assert self.handle.readline().startswith(b'From ')
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.handle.close()
def __iter__(self):
return iter(self.__next__())
def __next__(self):
lines = []
while True:
line = self.handle.readline()
if line == b'' or line.startswith(b'From '):
yield email.message_from_bytes(b''.join(lines))
if line == b'':
break
lines = []
continue
lines.append(line)
class Patch:
def __init__(self, data):
self.author = data['From']
self.to = data['To']
self.cc = data['Cc']
self.subject = data['Subject']
self.split_body = re.split('---', data.get_payload(), maxsplit=1)
self.commit_message = self.split_body[0]
self.diff = self.split_body[1]
class PatchSeries:
def __init__(self, filepath):
with MboxReader(filepath) as mbox:
self.patches = [Patch(message) for message in mbox]
assert self.patches
self.patch_count = len(self.patches)
self.path = filepath
@property
def path(self):
return self.path
self.branch = self.get_branch()
def get_branch(self):
fullprefix = ""
pattern = re.compile(r"(\[.*\])", re.DOTALL)
# There should be at least one patch in the series and it should
# include the branch name in the subject, so parse that
match = pattern.search(self.patches[0].subject)
if match:
fullprefix = match.group(1)
branch, branches, valid_branches = None, [], []
if fullprefix:
prefix = fullprefix.strip('[]')
branches = [ b.strip() for b in prefix.split(',')]
valid_branches = [b for b in branches if PatchSeries.valid_branch(b)]
if len(valid_branches):
branch = valid_branches[0]
# Get the branch name excluding any brackets. If nothing was
# found, then assume there was no branch tag in the subject line
# and that the patch targets master
if branch is not None:
return branch.split(']')[0]
else:
return "master"
@staticmethod
def valid_branch(branch):
""" Check if branch is valid name """
lbranch = branch.lower()
invalid = lbranch.startswith('patch') or \
lbranch.startswith('rfc') or \
lbranch.startswith('resend') or \
re.search(r'^v\d+', lbranch) or \
re.search(r'^\d+/\d+', lbranch)
return not invalid

View File

@ -1,43 +0,0 @@
# ex:ts=4:sw=4:sts=4:et
# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
#
# patchtestpatch: PatchTestPatch class which abstracts a patch file
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: GPL-2.0-only
#
import logging
import utils
logger = logging.getLogger('patchtest')
class PatchTestPatch(object):
def __init__(self, path, forcereload=False):
self._path = path
self._forcereload = forcereload
self._contents = None
self._branch = None
@property
def contents(self):
if self._forcereload or (not self._contents):
logger.debug('Reading %s contents' % self._path)
try:
with open(self._path, newline='') as _f:
self._contents = _f.read()
except IOError:
logger.warn("Reading the mbox %s failed" % self.resource)
return self._contents
@property
def path(self):
return self._path
@property
def branch(self):
if not self._branch:
self._branch = utils.get_branch(self._path)
return self._branch

View File

@ -10,11 +10,8 @@ import pyparsing
colon = pyparsing.Literal(":")
line_start = pyparsing.LineStart()
line_end = pyparsing.LineEnd()
at = pyparsing.Literal("@")
lessthan = pyparsing.Literal("<")
greaterthan = pyparsing.Literal(">")
opensquare = pyparsing.Literal("[")
closesquare = pyparsing.Literal("]")
inappropriate = pyparsing.CaselessLiteral("Inappropriate")
submitted = pyparsing.CaselessLiteral("Submitted")

View File

@ -8,40 +8,27 @@
# SPDX-License-Identifier: GPL-2.0-only
#
import os
import utils
import logging
import git
from patch import PatchTestPatch
logger = logging.getLogger('patchtest')
info=logger.info
import os
import mbox
class PatchTestRepo(object):
# prefixes used for temporal branches/stashes
prefix = 'patchtest'
def __init__(self, patch, repodir, commit=None, branch=None):
self._repodir = repodir
self._repo = git.Repo.init(repodir)
self._patch = PatchTestPatch(patch)
self._current_branch = self._repo.active_branch.name
self.repodir = repodir
self.repo = git.Repo.init(repodir)
self.patch = mbox.PatchSeries(patch)
self.current_branch = self.repo.active_branch.name
# targeted branch defined on the patch may be invalid, so make sure there
# is a corresponding remote branch
valid_patch_branch = None
if self._patch.branch in self._repo.branches:
valid_patch_branch = self._patch.branch
if self.patch.branch in self.repo.branches:
valid_patch_branch = self.patch.branch
# Target Branch
# Priority (top has highest priority):
# 1. branch given at cmd line
# 2. branch given at the patch
# 3. current branch
self._branch = branch or valid_patch_branch or self._current_branch
# Target Commit
# Priority (top has highest priority):
# 1. commit given at cmd line
@ -57,7 +44,7 @@ class PatchTestRepo(object):
# create working branch. Use the '-B' flag so that we just
# check out the existing one if it's there
self._repo.git.execute(['git', 'checkout', '-B', self._workingbranch, self._commit])
self.repo.git.execute(['git', 'checkout', '-B', self._workingbranch, self._commit])
self._patchmerged = False
@ -65,35 +52,13 @@ class PatchTestRepo(object):
self._patchcanbemerged = True
try:
# Make sure to get the absolute path of the file
self._repo.git.execute(['git', 'apply', '--check', os.path.abspath(self._patch.path)], with_exceptions=True)
self.repo.git.execute(['git', 'apply', '--check', os.path.abspath(self.patch.path)], with_exceptions=True)
except git.exc.GitCommandError as ce:
self._patchcanbemerged = False
# for debugging purposes, print all repo parameters
logger.debug("Parameters")
logger.debug("\tRepository : %s" % self._repodir)
logger.debug("\tTarget Commit : %s" % self._commit)
logger.debug("\tTarget Branch : %s" % self._branch)
logger.debug("\tWorking branch : %s" % self._workingbranch)
logger.debug("\tPatch : %s" % self._patch)
@property
def patch(self):
return self._patch.path
@property
def branch(self):
return self._branch
@property
def commit(self):
return self._commit
@property
def ismerged(self):
return self._patchmerged
@property
def canbemerged(self):
return self._patchcanbemerged
@ -103,7 +68,7 @@ class PatchTestRepo(object):
return None
try:
return self._repo.rev_parse(commit).hexsha
return self.repo.rev_parse(commit).hexsha
except Exception as e:
print(f"Couldn't find commit {commit} in repo")
@ -111,10 +76,10 @@ class PatchTestRepo(object):
def merge(self):
if self._patchcanbemerged:
self._repo.git.execute(['git', 'am', '--keep-cr', os.path.abspath(self._patch.path)])
self.repo.git.execute(['git', 'am', '--keep-cr', os.path.abspath(self.patch.path)])
self._patchmerged = True
def clean(self):
self._repo.git.execute(['git', 'checkout', self._current_branch])
self._repo.git.execute(['git', 'branch', '-D', self._workingbranch])
self.repo.git.execute(['git', 'checkout', self.current_branch])
self.repo.git.execute(['git', 'branch', '-D', self._workingbranch])
self._patchmerged = False

View File

@ -37,7 +37,6 @@ class Base(unittest.TestCase):
endcommit_messages_regex = re.compile(r'\(From \w+-\w+ rev:|(?<!\S)Signed-off-by|(?<!\S)---\n')
patchmetadata_regex = re.compile(r'-{3} \S+|\+{3} \S+|@{2} -\d+,\d+ \+\d+,\d+ @{2} \S+')
@staticmethod
def msg_to_commit(msg):
payload = msg.get_payload()
@ -66,13 +65,13 @@ class Base(unittest.TestCase):
def setUpClass(cls):
# General objects: mailbox.mbox and patchset
cls.mbox = mailbox.mbox(PatchTestInput.repo.patch)
cls.mbox = mailbox.mbox(PatchTestInput.repo.patch.path)
# Patch may be malformed, so try parsing it
cls.unidiff_parse_error = ''
cls.patchset = None
try:
cls.patchset = unidiff.PatchSet.from_filename(PatchTestInput.repo.patch, encoding=u'UTF-8')
cls.patchset = unidiff.PatchSet.from_filename(PatchTestInput.repo.patch.path, encoding=u'UTF-8')
except unidiff.UnidiffParseError as upe:
cls.patchset = []
cls.unidiff_parse_error = str(upe)

View File

@ -168,7 +168,7 @@ class TestMetadata(base.Metadata):
def test_cve_check_ignore(self):
# Skip if we neither modified a recipe or target branches are not
# Nanbield and newer. CVE_CHECK_IGNORE was first deprecated in Nanbield.
if not self.modified or PatchTestInput.repo.branch == "kirkstone" or PatchTestInput.repo.branch == "dunfell":
if not self.modified or PatchTestInput.repo.patch.branch == "kirkstone" or PatchTestInput.repo.patch.branch == "dunfell":
self.skip('No modified recipes or older target branch, skipping test')
for pn in self.modified:
# we are not interested in images

View File

@ -1,61 +0,0 @@
# ex:ts=4:sw=4:sts=4:et
# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
#
# utils: common methods used by the patchtest framework
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: GPL-2.0-only
#
import os
import subprocess
import logging
import re
import mailbox
def logger_create(name):
logger = logging.getLogger(name)
loggerhandler = logging.StreamHandler()
loggerhandler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(loggerhandler)
logger.setLevel(logging.INFO)
return logger
def valid_branch(branch):
""" Check if branch is valid name """
lbranch = branch.lower()
invalid = lbranch.startswith('patch') or \
lbranch.startswith('rfc') or \
lbranch.startswith('resend') or \
re.search(r'^v\d+', lbranch) or \
re.search(r'^\d+/\d+', lbranch)
return not invalid
def get_branch(path):
""" Get the branch name from mbox """
fullprefix = ""
mbox = mailbox.mbox(path)
if len(mbox):
subject = mbox[0]['subject']
if subject:
pattern = re.compile(r"(\[.*\])", re.DOTALL)
match = pattern.search(subject)
if match:
fullprefix = match.group(1)
branch, branches, valid_branches = None, [], []
if fullprefix:
prefix = fullprefix.strip('[]')
branches = [ b.strip() for b in prefix.split(',')]
valid_branches = [b for b in branches if valid_branch(b)]
if len(valid_branches):
branch = valid_branches[0]
return branch

View File

@ -9,12 +9,12 @@
# SPDX-License-Identifier: GPL-2.0-only
#
import sys
import os
import unittest
import logging
import traceback
import json
import logging
import os
import sys
import traceback
import unittest
# Include current path so test cases can see it
sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)))
@ -25,13 +25,14 @@ sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), '..
from data import PatchTestInput
from repo import PatchTestRepo
import utils
logger = utils.logger_create('patchtest')
logger = logging.getLogger("patchtest")
loggerhandler = logging.StreamHandler()
loggerhandler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(loggerhandler)
logger.setLevel(logging.INFO)
info = logger.info
error = logger.error
import repo
def getResult(patch, mergepatch, logfile=None):
class PatchTestResult(unittest.TextTestResult):