bitbake: hashserv: Add unihash-exists API

Adds API to check if the server is aware of the existence of a given
unihash. This can be used as an optimization for sstate where a client
can query the hash equivalence server to check if a unihash exists
before querying the sstate cache. If the hash server isn't aware of the
existence of a unihash, then there is very likely not a matching sstate
object, so this should be able to significantly cut down on the number
of negative hits on the sstate cache.

(Bitbake rev: cfe0ac071cfb998e4a1dd263f8860b140843361a)

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Joshua Watt 2024-02-18 15:59:48 -07:00 committed by Richard Purdie
parent be909636c6
commit 3bd2c69e70
6 changed files with 151 additions and 33 deletions

View File

@ -217,6 +217,14 @@ def main():
print("Removed %d rows" % result["count"])
return 0
def handle_unihash_exists(args, client):
result = client.unihash_exists(args.unihash)
if args.quiet:
return 0 if result else 1
print("true" if result else "false")
return 0
parser = argparse.ArgumentParser(description='Hash Equivalence Client')
parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")')
parser.add_argument('--log', default='WARNING', help='Set logging level')
@ -309,6 +317,11 @@ def main():
gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation")
gc_sweep_parser.set_defaults(func=handle_gc_sweep)
unihash_exists_parser = subparsers.add_parser('unihash-exists', help="Check if a unihash is known to the server")
unihash_exists_parser.add_argument("--quiet", action="store_true", help="Don't print status. Instead, exit with 0 if unihash exists and 1 if it does not")
unihash_exists_parser.add_argument("unihash", help="Unihash to check")
unihash_exists_parser.set_defaults(func=handle_unihash_exists)
args = parser.parse_args()
logger = logging.getLogger('hashserv')

View File

@ -16,6 +16,7 @@ logger = logging.getLogger("hashserv.client")
class AsyncClient(bb.asyncrpc.AsyncClient):
MODE_NORMAL = 0
MODE_GET_STREAM = 1
MODE_EXIST_STREAM = 2
def __init__(self, username=None, password=None):
super().__init__("OEHASHEQUIV", "1.1", logger)
@ -49,19 +50,36 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
await self.socket.send("END")
return await self.socket.recv()
if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM:
async def normal_to_stream(command):
r = await self.invoke({command: None})
if r != "ok":
raise ConnectionError(
f"Unable to transition to stream mode: Bad response from server {r!r}"
)
self.logger.debug("Mode is now %s", command)
if new_mode == self.mode:
return
self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode)
# Always transition to normal mode before switching to any other mode
if self.mode != self.MODE_NORMAL:
r = await self._send_wrapper(stream_to_normal)
if r != "ok":
self.check_invoke_error(r)
raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r)
elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL:
r = await self.invoke({"get-stream": None})
if r != "ok":
raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r)
elif new_mode != self.mode:
raise Exception(
"Undefined mode transition %r -> %r" % (self.mode, new_mode)
)
raise ConnectionError(
f"Unable to transition to normal mode: Bad response from server {r!r}"
)
self.logger.debug("Mode is now normal")
if new_mode == self.MODE_GET_STREAM:
await normal_to_stream("get-stream")
elif new_mode == self.MODE_EXIST_STREAM:
await normal_to_stream("exists-stream")
elif new_mode != self.MODE_NORMAL:
raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}")
self.mode = new_mode
@ -95,6 +113,11 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
{"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
)
async def unihash_exists(self, unihash):
await self._set_mode(self.MODE_EXIST_STREAM)
r = await self.send_stream(unihash)
return r == "true"
async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
await self._set_mode(self.MODE_NORMAL)
return await self.invoke(
@ -236,6 +259,7 @@ class Client(bb.asyncrpc.Client):
"report_unihash",
"report_unihash_equiv",
"get_taskhash",
"unihash_exists",
"get_outhash",
"get_stats",
"reset_stats",

View File

@ -234,6 +234,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
"get": self.handle_get,
"get-outhash": self.handle_get_outhash,
"get-stream": self.handle_get_stream,
"exists-stream": self.handle_exists_stream,
"get-stats": self.handle_get_stats,
"get-db-usage": self.handle_get_db_usage,
"get-db-query-columns": self.handle_get_db_query_columns,
@ -377,8 +378,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
await self.db.insert_outhash(data)
@permissions(READ_PERM)
async def handle_get_stream(self, request):
async def _stream_handler(self, handler):
await self.socket.send_message("ok")
while True:
@ -400,35 +400,50 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
if l == "END":
break
(method, taskhash) = l.split()
# self.logger.debug('Looking up %s %s' % (method, taskhash))
row = await self.db.get_equivalent(method, taskhash)
if row is not None:
msg = row["unihash"]
# self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
elif self.upstream_client is not None:
upstream = await self.upstream_client.get_unihash(method, taskhash)
if upstream:
msg = upstream
else:
msg = ""
else:
msg = ""
msg = await handler(l)
await self.socket.send(msg)
finally:
request_measure.end()
self.request_sample.end()
# Post to the backfill queue after writing the result to minimize
# the turn around time on a request
if upstream is not None:
await self.server.backfill_queue.put((method, taskhash))
await self.socket.send("ok")
return self.NO_RESPONSE
@permissions(READ_PERM)
async def handle_get_stream(self, request):
async def handler(l):
(method, taskhash) = l.split()
# self.logger.debug('Looking up %s %s' % (method, taskhash))
row = await self.db.get_equivalent(method, taskhash)
if row is not None:
# self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
return row["unihash"]
if self.upstream_client is not None:
upstream = await self.upstream_client.get_unihash(method, taskhash)
if upstream:
await self.server.backfill_queue.put((method, taskhash))
return upstream
return ""
return await self._stream_handler(handler)
@permissions(READ_PERM)
async def handle_exists_stream(self, request):
async def handler(l):
if await self.db.unihash_exists(l):
return "true"
if self.upstream_client is not None:
if await self.upstream_client.unihash_exists(l):
return "true"
return "false"
return await self._stream_handler(handler)
async def report_readonly(self, data):
method = data["method"]
outhash = data["outhash"]

View File

@ -48,6 +48,7 @@ class UnihashesV3(Base):
__table_args__ = (
UniqueConstraint("method", "taskhash"),
Index("taskhash_lookup_v4", "method", "taskhash"),
Index("unihash_lookup_v1", "unihash"),
)
@ -279,6 +280,16 @@ class Database(object):
)
return map_row(result.first())
async def unihash_exists(self, unihash):
async with self.db.begin():
result = await self._execute(
select(UnihashesV3)
.where(UnihashesV3.unihash == unihash)
.limit(1)
)
return result.first() is not None
async def get_outhash(self, method, outhash):
async with self.db.begin():
result = await self._execute(

View File

@ -144,6 +144,9 @@ class DatabaseEngine(object):
cursor.execute(
"CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS unihash_lookup_v1 ON unihashes_v3 (unihash)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)"
)
@ -255,6 +258,19 @@ class Database(object):
)
return cursor.fetchone()
async def unihash_exists(self, unihash):
with closing(self.db.cursor()) as cursor:
cursor.execute(
"""
SELECT * FROM unihashes_v3 WHERE unihash=:unihash
LIMIT 1
""",
{
"unihash": unihash,
},
)
return cursor.fetchone() is not None
async def get_outhash(self, method, outhash):
with closing(self.db.cursor()) as cursor:
cursor.execute(

View File

@ -442,6 +442,11 @@ class HashEquivalenceCommonTests(object):
self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream')
self.assertEqual(result['method'], self.METHOD)
def test_unihash_exsits(self):
taskhash, outhash, unihash = self.create_test_hash(self.client)
self.assertTrue(self.client.unihash_exists(unihash))
self.assertFalse(self.client.unihash_exists('6662e699d6e3d894b24408ff9a4031ef9b038ee8'))
def test_ro_server(self):
rw_server = self.start_server()
rw_client = self.start_client(rw_server.address)
@ -1031,6 +1036,40 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
def test_stress(self):
self.run_hashclient(["--address", self.server_address, "stress"], check=True)
def test_unihash_exsits(self):
taskhash, outhash, unihash = self.create_test_hash(self.client)
p = self.run_hashclient([
"--address", self.server_address,
"unihash-exists", unihash,
], check=True)
self.assertEqual(p.stdout.strip(), "true")
p = self.run_hashclient([
"--address", self.server_address,
"unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8',
], check=True)
self.assertEqual(p.stdout.strip(), "false")
def test_unihash_exsits_quiet(self):
taskhash, outhash, unihash = self.create_test_hash(self.client)
p = self.run_hashclient([
"--address", self.server_address,
"unihash-exists", unihash,
"--quiet",
])
self.assertEqual(p.returncode, 0)
self.assertEqual(p.stdout.strip(), "")
p = self.run_hashclient([
"--address", self.server_address,
"unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8',
"--quiet",
])
self.assertEqual(p.returncode, 1)
self.assertEqual(p.stdout.strip(), "")
def test_remove_taskhash(self):
taskhash, outhash, unihash = self.create_test_hash(self.client)
self.run_hashclient([