bitbake: asyncrpc: Prefix log messages with client info

Adds a logging adaptor to the asyncrpc clients that prefixes log
messages with the client remote address to aid in debugging

(Bitbake rev: f4d64ce73c2449c008ff5d9b32376a2893ef7195)

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Joshua Watt 2023-11-03 08:26:23 -06:00 committed by Richard Purdie
parent 8ae00cf20d
commit 4cdb0f00f9
2 changed files with 23 additions and 8 deletions

View File

@ -12,10 +12,16 @@ import signal
import socket
import sys
import multiprocessing
import logging
from .connection import StreamConnection, WebsocketConnection
from .exceptions import ClientError, ServerError, ConnectionClosedError
class ClientLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return f"[Client {self.extra['address']}] {msg}", kwargs
class AsyncServerConnection(object):
# If a handler returns this object (e.g. `return self.NO_RESPONSE`), no
# return message will be automatically be sent back to the client
@ -27,7 +33,12 @@ class AsyncServerConnection(object):
self.handlers = {
"ping": self.handle_ping,
}
self.logger = logger
self.logger = ClientLoggerAdapter(
logger,
{
"address": socket.address,
},
)
async def close(self):
await self.socket.close()
@ -242,16 +253,20 @@ class AsyncServer(object):
self.server = WebsocketsServer(host, port, self._client_handler, self.logger)
async def _client_handler(self, socket):
address = socket.address
try:
client = self.accept_client(socket)
await client.process_requests()
except Exception as e:
import traceback
self.logger.error("Error from client: %s" % str(e), exc_info=True)
self.logger.error(
"Error from client %s: %s" % (address, str(e)), exc_info=True
)
traceback.print_exc()
finally:
self.logger.debug("Client %s disconnected", address)
await socket.close()
self.logger.debug("Client disconnected")
@abc.abstractmethod
def accept_client(self, socket):

View File

@ -207,7 +207,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
async def dispatch_message(self, msg):
for k in self.handlers.keys():
if k in msg:
logger.debug('Handling %s' % k)
self.logger.debug('Handling %s' % k)
if 'stream' in k:
return await self.handlers[k](msg[k])
else:
@ -351,7 +351,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
break
(method, taskhash) = l.split()
#logger.debug('Looking up %s %s' % (method, taskhash))
#self.logger.debug('Looking up %s %s' % (method, taskhash))
cursor = self.db.cursor()
try:
row = self.query_equivalent(cursor, method, taskhash)
@ -360,7 +360,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
if row is not None:
msg = row['unihash']
#logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
#self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
elif self.upstream_client is not None:
upstream = await self.upstream_client.get_unihash(method, taskhash)
if upstream:
@ -480,8 +480,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
row = self.query_equivalent(cursor, data['method'], data['taskhash'])
if row['unihash'] == data['unihash']:
logger.info('Adding taskhash equivalence for %s with unihash %s',
data['taskhash'], row['unihash'])
self.logger.info('Adding taskhash equivalence for %s with unihash %s',
data['taskhash'], row['unihash'])
d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}