mirror of
git://git.yoctoproject.org/poky.git
synced 2025-07-19 21:09:03 +02:00

Previously, incrementing "0.9" would result in "0.1.0", which generally gets recognised as a lower version number. Even more surprising, incrementing "0.99" returned "0.1.0.0". This is due to the behaviour of the list function on a string object; it adds each character as an element in a new list, causing the new string '10' to become the list [ '1', '0' ]. Instead of converting a string to a list, add the string to a new list, and concatenate it with the existing list slice. And provide test cases for "0.9" -> "0.10" and related edge cases. (Bitbake rev: 96ddeefa88ff4c37e9ea096726a7cdca5b5b4572) Signed-off-by: Dan McGregor <dan.mcgregor@usask.ca> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
389 lines
18 KiB
Python
389 lines
18 KiB
Python
#! /usr/bin/env python3
|
|
#
|
|
# Copyright (C) 2024 BitBake Contributors
|
|
#
|
|
# SPDX-License-Identifier: GPL-2.0-only
|
|
#
|
|
|
|
from . import create_server, create_client, increase_revision, revision_greater, revision_smaller, _revision_greater_or_equal
|
|
import prserv.db as db
|
|
from bb.asyncrpc import InvokeError
|
|
import logging
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import unittest
|
|
import socket
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
THIS_DIR = Path(__file__).parent
|
|
BIN_DIR = THIS_DIR.parent.parent / "bin"
|
|
|
|
version = "dummy-1.0-r0"
|
|
pkgarch = "core2-64"
|
|
other_arch = "aarch64"
|
|
|
|
checksumX = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4f0"
|
|
checksum0 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a0"
|
|
checksum1 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a1"
|
|
checksum2 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a2"
|
|
checksum3 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a3"
|
|
checksum4 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a4"
|
|
checksum5 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a5"
|
|
checksum6 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a6"
|
|
checksum7 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a7"
|
|
checksum8 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a8"
|
|
checksum9 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a9"
|
|
checksum10 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4aa"
|
|
|
|
def server_prefunc(server, name):
|
|
logging.basicConfig(level=logging.DEBUG, filename='prserv-%s.log' % name, filemode='w',
|
|
format='%(levelname)s %(filename)s:%(lineno)d %(message)s')
|
|
server.logger.debug("Running server %s" % name)
|
|
sys.stdout = open('prserv-stdout-%s.log' % name, 'w')
|
|
sys.stderr = sys.stdout
|
|
|
|
class PRTestSetup(object):
|
|
|
|
def start_server(self, name, dbfile, upstream=None, read_only=False, prefunc=server_prefunc):
|
|
|
|
def cleanup_server(server):
|
|
if server.process.exitcode is not None:
|
|
return
|
|
server.process.terminate()
|
|
server.process.join()
|
|
|
|
server = create_server(socket.gethostbyname("localhost") + ":0",
|
|
dbfile,
|
|
upstream=upstream,
|
|
read_only=read_only)
|
|
|
|
server.serve_as_process(prefunc=prefunc, args=(name,))
|
|
self.addCleanup(cleanup_server, server)
|
|
|
|
return server
|
|
|
|
def start_client(self, server_address):
|
|
def cleanup_client(client):
|
|
client.close()
|
|
|
|
client = create_client(server_address)
|
|
self.addCleanup(cleanup_client, client)
|
|
|
|
return client
|
|
|
|
class FunctionTests(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-prserv')
|
|
self.addCleanup(self.temp_dir.cleanup)
|
|
|
|
def test_increase_revision(self):
|
|
self.assertEqual(increase_revision("1"), "2")
|
|
self.assertEqual(increase_revision("1.0"), "1.1")
|
|
self.assertEqual(increase_revision("1.1.1"), "1.1.2")
|
|
self.assertEqual(increase_revision("1.1.1.3"), "1.1.1.4")
|
|
self.assertEqual(increase_revision("9"), "10")
|
|
self.assertEqual(increase_revision("1.9"), "1.10")
|
|
self.assertRaises(ValueError, increase_revision, "1.a")
|
|
self.assertRaises(ValueError, increase_revision, "1.")
|
|
self.assertRaises(ValueError, increase_revision, "")
|
|
|
|
def test_revision_greater_or_equal(self):
|
|
self.assertTrue(_revision_greater_or_equal("2", "2"))
|
|
self.assertTrue(_revision_greater_or_equal("2", "1"))
|
|
self.assertTrue(_revision_greater_or_equal("10", "2"))
|
|
self.assertTrue(_revision_greater_or_equal("1.10", "1.2"))
|
|
self.assertFalse(_revision_greater_or_equal("1.2", "1.10"))
|
|
self.assertTrue(_revision_greater_or_equal("1.10", "1"))
|
|
self.assertTrue(_revision_greater_or_equal("1.10.1", "1.10"))
|
|
self.assertFalse(_revision_greater_or_equal("1.10.1", "1.10.2"))
|
|
self.assertTrue(_revision_greater_or_equal("1.10.1", "1.10.1"))
|
|
self.assertTrue(_revision_greater_or_equal("1.10.1", "1"))
|
|
self.assertTrue(revision_greater("1.20", "1.3"))
|
|
self.assertTrue(revision_smaller("1.3", "1.20"))
|
|
|
|
# DB tests
|
|
|
|
def test_db(self):
|
|
dbfile = os.path.join(self.temp_dir.name, "testtable.sqlite3")
|
|
|
|
self.db = db.PRData(dbfile)
|
|
self.table = self.db["PRMAIN"]
|
|
|
|
self.table.store_value(version, pkgarch, checksum0, "0")
|
|
self.table.store_value(version, pkgarch, checksum1, "1")
|
|
# "No history" mode supports multiple PRs for the same checksum
|
|
self.table.store_value(version, pkgarch, checksum0, "2")
|
|
self.table.store_value(version, pkgarch, checksum2, "1.0")
|
|
|
|
self.assertTrue(self.table.test_package(version, pkgarch))
|
|
self.assertFalse(self.table.test_package(version, other_arch))
|
|
|
|
self.assertTrue(self.table.test_value(version, pkgarch, "0"))
|
|
self.assertTrue(self.table.test_value(version, pkgarch, "1"))
|
|
self.assertTrue(self.table.test_value(version, pkgarch, "2"))
|
|
|
|
self.assertEqual(self.table.find_package_max_value(version, pkgarch), "2")
|
|
|
|
self.assertEqual(self.table.find_min_value(version, pkgarch, checksum0), "0")
|
|
self.assertEqual(self.table.find_max_value(version, pkgarch, checksum0), "2")
|
|
|
|
# Test history modes
|
|
self.assertEqual(self.table.find_value(version, pkgarch, checksum0, True), "0")
|
|
self.assertEqual(self.table.find_value(version, pkgarch, checksum0, False), "2")
|
|
|
|
self.assertEqual(self.table.find_new_subvalue(version, pkgarch, "3"), "3.0")
|
|
self.assertEqual(self.table.find_new_subvalue(version, pkgarch, "1"), "1.1")
|
|
|
|
# Revision comparison tests
|
|
self.table.store_value(version, pkgarch, checksum1, "1.3")
|
|
self.table.store_value(version, pkgarch, checksum1, "1.20")
|
|
self.assertEqual(self.table.find_min_value(version, pkgarch, checksum1), "1")
|
|
self.assertEqual(self.table.find_max_value(version, pkgarch, checksum1), "1.20")
|
|
|
|
class PRBasicTests(PRTestSetup, unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-prserv')
|
|
self.addCleanup(self.temp_dir.cleanup)
|
|
|
|
dbfile = os.path.join(self.temp_dir.name, "prtest-basic.sqlite3")
|
|
|
|
self.server1 = self.start_server("basic", dbfile)
|
|
self.client1 = self.start_client(self.server1.address)
|
|
|
|
def test_basic(self):
|
|
|
|
# Checks on non existing configuration
|
|
|
|
result = self.client1.test_pr(version, pkgarch, checksum0)
|
|
self.assertIsNone(result, "test_pr should return 'None' for a non existing PR")
|
|
|
|
result = self.client1.test_package(version, pkgarch)
|
|
self.assertFalse(result, "test_package should return 'False' for a non existing PR")
|
|
|
|
result = self.client1.max_package_pr(version, pkgarch)
|
|
self.assertIsNone(result, "max_package_pr should return 'None' for a non existing PR")
|
|
|
|
# Add a first configuration
|
|
|
|
result = self.client1.getPR(version, pkgarch, checksum0)
|
|
self.assertEqual(result, "0", "getPR: initial PR of a package should be '0'")
|
|
|
|
result = self.client1.test_pr(version, pkgarch, checksum0)
|
|
self.assertEqual(result, "0", "test_pr should return '0' here, matching the result of getPR")
|
|
|
|
result = self.client1.test_package(version, pkgarch)
|
|
self.assertTrue(result, "test_package should return 'True' for an existing PR")
|
|
|
|
result = self.client1.max_package_pr(version, pkgarch)
|
|
self.assertEqual(result, "0", "max_package_pr should return '0' in the current test series")
|
|
|
|
# Check that the same request gets the same value
|
|
|
|
result = self.client1.getPR(version, pkgarch, checksum0)
|
|
self.assertEqual(result, "0", "getPR: asking for the same PR a second time in a row should return the same value.")
|
|
|
|
# Add new configurations
|
|
|
|
result = self.client1.getPR(version, pkgarch, checksum1)
|
|
self.assertEqual(result, "1", "getPR: second PR of a package should be '1'")
|
|
|
|
result = self.client1.test_pr(version, pkgarch, checksum1)
|
|
self.assertEqual(result, "1", "test_pr should return '1' here, matching the result of getPR")
|
|
|
|
result = self.client1.max_package_pr(version, pkgarch)
|
|
self.assertEqual(result, "1", "max_package_pr should return '1' in the current test series")
|
|
|
|
result = self.client1.getPR(version, pkgarch, checksum2)
|
|
self.assertEqual(result, "2", "getPR: second PR of a package should be '2'")
|
|
|
|
result = self.client1.test_pr(version, pkgarch, checksum2)
|
|
self.assertEqual(result, "2", "test_pr should return '2' here, matching the result of getPR")
|
|
|
|
result = self.client1.max_package_pr(version, pkgarch)
|
|
self.assertEqual(result, "2", "max_package_pr should return '2' in the current test series")
|
|
|
|
result = self.client1.getPR(version, pkgarch, checksum3)
|
|
self.assertEqual(result, "3", "getPR: second PR of a package should be '3'")
|
|
|
|
result = self.client1.test_pr(version, pkgarch, checksum3)
|
|
self.assertEqual(result, "3", "test_pr should return '3' here, matching the result of getPR")
|
|
|
|
result = self.client1.max_package_pr(version, pkgarch)
|
|
self.assertEqual(result, "3", "max_package_pr should return '3' in the current test series")
|
|
|
|
# Ask again for the first configuration
|
|
|
|
result = self.client1.getPR(version, pkgarch, checksum0)
|
|
self.assertEqual(result, "4", "getPR: should return '4' in this configuration")
|
|
|
|
# Ask again with explicit "no history" mode
|
|
|
|
result = self.client1.getPR(version, pkgarch, checksum0, False)
|
|
self.assertEqual(result, "4", "getPR: should return '4' in this configuration")
|
|
|
|
# Ask again with explicit "history" mode. This should return the first recorded PR for checksum0
|
|
|
|
result = self.client1.getPR(version, pkgarch, checksum0, True)
|
|
self.assertEqual(result, "0", "getPR: should return '0' in this configuration")
|
|
|
|
# Check again that another pkgarg resets the counters
|
|
|
|
result = self.client1.test_pr(version, other_arch, checksum0)
|
|
self.assertIsNone(result, "test_pr should return 'None' for a non existing PR")
|
|
|
|
result = self.client1.test_package(version, other_arch)
|
|
self.assertFalse(result, "test_package should return 'False' for a non existing PR")
|
|
|
|
result = self.client1.max_package_pr(version, other_arch)
|
|
self.assertIsNone(result, "max_package_pr should return 'None' for a non existing PR")
|
|
|
|
# Now add the configuration
|
|
|
|
result = self.client1.getPR(version, other_arch, checksum0)
|
|
self.assertEqual(result, "0", "getPR: initial PR of a package should be '0'")
|
|
|
|
result = self.client1.test_pr(version, other_arch, checksum0)
|
|
self.assertEqual(result, "0", "test_pr should return '0' here, matching the result of getPR")
|
|
|
|
result = self.client1.test_package(version, other_arch)
|
|
self.assertTrue(result, "test_package should return 'True' for an existing PR")
|
|
|
|
result = self.client1.max_package_pr(version, other_arch)
|
|
self.assertEqual(result, "0", "max_package_pr should return '0' in the current test series")
|
|
|
|
result = self.client1.is_readonly()
|
|
self.assertFalse(result, "Server should not be described as 'read-only'")
|
|
|
|
class PRUpstreamTests(PRTestSetup, unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-prserv')
|
|
self.addCleanup(self.temp_dir.cleanup)
|
|
|
|
dbfile2 = os.path.join(self.temp_dir.name, "prtest-upstream2.sqlite3")
|
|
self.server2 = self.start_server("upstream2", dbfile2)
|
|
self.client2 = self.start_client(self.server2.address)
|
|
|
|
dbfile1 = os.path.join(self.temp_dir.name, "prtest-upstream1.sqlite3")
|
|
self.server1 = self.start_server("upstream1", dbfile1, upstream=self.server2.address)
|
|
self.client1 = self.start_client(self.server1.address)
|
|
|
|
dbfile0 = os.path.join(self.temp_dir.name, "prtest-local.sqlite3")
|
|
self.server0 = self.start_server("local", dbfile0, upstream=self.server1.address)
|
|
self.client0 = self.start_client(self.server0.address)
|
|
self.shared_db = dbfile0
|
|
|
|
def test_upstream_and_readonly(self):
|
|
|
|
# For identical checksums, all servers should return the same PR
|
|
|
|
result = self.client2.getPR(version, pkgarch, checksum0)
|
|
self.assertEqual(result, "0", "getPR: initial PR of a package should be '0'")
|
|
|
|
result = self.client1.getPR(version, pkgarch, checksum0)
|
|
self.assertEqual(result, "0", "getPR: initial PR of a package should be '0' (same as upstream)")
|
|
|
|
result = self.client0.getPR(version, pkgarch, checksum0)
|
|
self.assertEqual(result, "0", "getPR: initial PR of a package should be '0' (same as upstream)")
|
|
|
|
# Now introduce new checksums on server1 for, same version
|
|
|
|
result = self.client1.getPR(version, pkgarch, checksum1)
|
|
self.assertEqual(result, "0.0", "getPR: first PR of a package which has a different checksum upstream should be '0.0'")
|
|
|
|
result = self.client1.getPR(version, pkgarch, checksum2)
|
|
self.assertEqual(result, "0.1", "getPR: second PR of a package that has a different checksum upstream should be '0.1'")
|
|
|
|
# Now introduce checksums on server0 for, same version
|
|
|
|
result = self.client1.getPR(version, pkgarch, checksum1)
|
|
self.assertEqual(result, "0.2", "getPR: can't decrease for known PR")
|
|
|
|
result = self.client1.getPR(version, pkgarch, checksum2)
|
|
self.assertEqual(result, "0.3")
|
|
|
|
result = self.client1.max_package_pr(version, pkgarch)
|
|
self.assertEqual(result, "0.3")
|
|
|
|
result = self.client0.getPR(version, pkgarch, checksum3)
|
|
self.assertEqual(result, "0.3.0", "getPR: first PR of a package that doesn't exist upstream should be '0.3.0'")
|
|
|
|
result = self.client0.getPR(version, pkgarch, checksum4)
|
|
self.assertEqual(result, "0.3.1", "getPR: second PR of a package that doesn't exist upstream should be '0.3.1'")
|
|
|
|
result = self.client0.getPR(version, pkgarch, checksum3)
|
|
self.assertEqual(result, "0.3.2")
|
|
|
|
# More upstream updates
|
|
# Here, we assume no communication between server2 and server0. server2 only impacts server0
|
|
# after impacting server1
|
|
|
|
self.assertEqual(self.client2.getPR(version, pkgarch, checksum5), "1")
|
|
self.assertEqual(self.client1.getPR(version, pkgarch, checksum6), "1.0")
|
|
self.assertEqual(self.client1.getPR(version, pkgarch, checksum7), "1.1")
|
|
self.assertEqual(self.client0.getPR(version, pkgarch, checksum8), "1.1.0")
|
|
self.assertEqual(self.client0.getPR(version, pkgarch, checksum9), "1.1.1")
|
|
|
|
# "history" mode tests
|
|
|
|
self.assertEqual(self.client2.getPR(version, pkgarch, checksum0, True), "0")
|
|
self.assertEqual(self.client1.getPR(version, pkgarch, checksum2, True), "0.1")
|
|
self.assertEqual(self.client0.getPR(version, pkgarch, checksum3, True), "0.3.0")
|
|
|
|
# More "no history" mode tests
|
|
|
|
self.assertEqual(self.client2.getPR(version, pkgarch, checksum0), "2")
|
|
self.assertEqual(self.client1.getPR(version, pkgarch, checksum0), "2") # Same as upstream
|
|
self.assertEqual(self.client0.getPR(version, pkgarch, checksum0), "2") # Same as upstream
|
|
self.assertEqual(self.client1.getPR(version, pkgarch, checksum7), "3") # This could be surprising, but since the previous revision was "2", increasing it yields "3".
|
|
# We don't know how many upstream servers we have
|
|
# Start read-only server with server1 as upstream
|
|
self.server_ro = self.start_server("local-ro", self.shared_db, upstream=self.server1.address, read_only=True)
|
|
self.client_ro = self.start_client(self.server_ro.address)
|
|
|
|
self.assertTrue(self.client_ro.is_readonly(), "Database should be described as 'read-only'")
|
|
|
|
# Checks on non existing configurations
|
|
self.assertIsNone(self.client_ro.test_pr(version, pkgarch, checksumX))
|
|
self.assertFalse(self.client_ro.test_package("unknown", pkgarch))
|
|
|
|
# Look up existing configurations
|
|
self.assertEqual(self.client_ro.getPR(version, pkgarch, checksum0), "3") # "no history" mode
|
|
self.assertEqual(self.client_ro.getPR(version, pkgarch, checksum0, True), "0") # "history" mode
|
|
self.assertEqual(self.client_ro.getPR(version, pkgarch, checksum3), "3")
|
|
self.assertEqual(self.client_ro.getPR(version, pkgarch, checksum3, True), "0.3.0")
|
|
self.assertEqual(self.client_ro.max_package_pr(version, pkgarch), "2") # normal as "3" was never saved
|
|
|
|
# Try to insert a new value. Here this one is know upstream.
|
|
self.assertEqual(self.client_ro.getPR(version, pkgarch, checksum7), "3")
|
|
# Try to insert a completely new value. As the max upstream value is already "3", it should be "3.0"
|
|
self.assertEqual(self.client_ro.getPR(version, pkgarch, checksum10), "3.0")
|
|
# Same with another value which only exists in the upstream upstream server
|
|
# This time, as the upstream server doesn't know it, it will ask its upstream server. So that's a known one.
|
|
self.assertEqual(self.client_ro.getPR(version, pkgarch, checksum9), "3")
|
|
|
|
class ScriptTests(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-prserv')
|
|
self.addCleanup(self.temp_dir.cleanup)
|
|
self.dbfile = os.path.join(self.temp_dir.name, "prtest.sqlite3")
|
|
|
|
def test_1_start_bitbake_prserv(self):
|
|
try:
|
|
subprocess.check_call([BIN_DIR / "bitbake-prserv", "--start", "-f", self.dbfile])
|
|
except subprocess.CalledProcessError as e:
|
|
self.fail("Failed to start bitbake-prserv: %s" % e.returncode)
|
|
|
|
def test_2_stop_bitbake_prserv(self):
|
|
try:
|
|
subprocess.check_call([BIN_DIR / "bitbake-prserv", "--stop"])
|
|
except subprocess.CalledProcessError as e:
|
|
self.fail("Failed to stop bitbake-prserv: %s" % e.returncode)
|