mirror of
git://git.yoctoproject.org/poky.git
synced 2025-07-19 12:59:02 +02:00
bitbake: lib/bb: Add xattr and acl libraries
Adds Python wrappers around the xattr API from libc and the ACL API from libacl. (Bitbake rev: 538011256964d0253f8e3ab7ff1d6fd62c7c2f89) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
parent
8087c261b0
commit
871a4ac6e7
215
bitbake/lib/bb/acl.py
Executable file
215
bitbake/lib/bb/acl.py
Executable file
|
@ -0,0 +1,215 @@
|
|||
#! /usr/bin/env python3
|
||||
#
|
||||
# Copyright 2023 by Garmin Ltd. or its subsidiaries
|
||||
#
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
|
||||
import sys
|
||||
import ctypes
|
||||
import os
|
||||
import errno
|
||||
import pwd
|
||||
import grp
|
||||
|
||||
libacl = ctypes.CDLL("libacl.so.1", use_errno=True)
|
||||
|
||||
|
||||
ACL_TYPE_ACCESS = 0x8000
|
||||
ACL_TYPE_DEFAULT = 0x4000
|
||||
|
||||
ACL_FIRST_ENTRY = 0
|
||||
ACL_NEXT_ENTRY = 1
|
||||
|
||||
ACL_UNDEFINED_TAG = 0x00
|
||||
ACL_USER_OBJ = 0x01
|
||||
ACL_USER = 0x02
|
||||
ACL_GROUP_OBJ = 0x04
|
||||
ACL_GROUP = 0x08
|
||||
ACL_MASK = 0x10
|
||||
ACL_OTHER = 0x20
|
||||
|
||||
ACL_READ = 0x04
|
||||
ACL_WRITE = 0x02
|
||||
ACL_EXECUTE = 0x01
|
||||
|
||||
acl_t = ctypes.c_void_p
|
||||
acl_entry_t = ctypes.c_void_p
|
||||
acl_permset_t = ctypes.c_void_p
|
||||
acl_perm_t = ctypes.c_uint
|
||||
|
||||
acl_tag_t = ctypes.c_int
|
||||
|
||||
libacl.acl_free.argtypes = [acl_t]
|
||||
|
||||
|
||||
def acl_free(acl):
|
||||
libacl.acl_free(acl)
|
||||
|
||||
|
||||
libacl.acl_get_file.restype = acl_t
|
||||
libacl.acl_get_file.argtypes = [ctypes.c_char_p, ctypes.c_uint]
|
||||
|
||||
|
||||
def acl_get_file(path, typ):
|
||||
acl = libacl.acl_get_file(os.fsencode(path), typ)
|
||||
if acl is None:
|
||||
err = ctypes.get_errno()
|
||||
raise OSError(err, os.strerror(err), str(path))
|
||||
|
||||
return acl
|
||||
|
||||
|
||||
libacl.acl_get_entry.argtypes = [acl_t, ctypes.c_int, ctypes.c_void_p]
|
||||
|
||||
|
||||
def acl_get_entry(acl, entry_id):
|
||||
entry = acl_entry_t()
|
||||
ret = libacl.acl_get_entry(acl, entry_id, ctypes.byref(entry))
|
||||
if ret < 0:
|
||||
err = ctypes.get_errno()
|
||||
raise OSError(err, os.strerror(err))
|
||||
|
||||
if ret == 0:
|
||||
return None
|
||||
|
||||
return entry
|
||||
|
||||
|
||||
libacl.acl_get_tag_type.argtypes = [acl_entry_t, ctypes.c_void_p]
|
||||
|
||||
|
||||
def acl_get_tag_type(entry_d):
|
||||
tag = acl_tag_t()
|
||||
ret = libacl.acl_get_tag_type(entry_d, ctypes.byref(tag))
|
||||
if ret < 0:
|
||||
err = ctypes.get_errno()
|
||||
raise OSError(err, os.strerror(err))
|
||||
return tag.value
|
||||
|
||||
|
||||
libacl.acl_get_qualifier.restype = ctypes.c_void_p
|
||||
libacl.acl_get_qualifier.argtypes = [acl_entry_t]
|
||||
|
||||
|
||||
def acl_get_qualifier(entry_d):
|
||||
ret = libacl.acl_get_qualifier(entry_d)
|
||||
if ret is None:
|
||||
err = ctypes.get_errno()
|
||||
raise OSError(err, os.strerror(err))
|
||||
return ctypes.c_void_p(ret)
|
||||
|
||||
|
||||
libacl.acl_get_permset.argtypes = [acl_entry_t, ctypes.c_void_p]
|
||||
|
||||
|
||||
def acl_get_permset(entry_d):
|
||||
permset = acl_permset_t()
|
||||
ret = libacl.acl_get_permset(entry_d, ctypes.byref(permset))
|
||||
if ret < 0:
|
||||
err = ctypes.get_errno()
|
||||
raise OSError(err, os.strerror(err))
|
||||
|
||||
return permset
|
||||
|
||||
|
||||
libacl.acl_get_perm.argtypes = [acl_permset_t, acl_perm_t]
|
||||
|
||||
|
||||
def acl_get_perm(permset_d, perm):
|
||||
ret = libacl.acl_get_perm(permset_d, perm)
|
||||
if ret < 0:
|
||||
err = ctypes.get_errno()
|
||||
raise OSError(err, os.strerror(err))
|
||||
return bool(ret)
|
||||
|
||||
|
||||
class Entry(object):
|
||||
def __init__(self, tag, qualifier, mode):
|
||||
self.tag = tag
|
||||
self.qualifier = qualifier
|
||||
self.mode = mode
|
||||
|
||||
def __str__(self):
|
||||
typ = ""
|
||||
qual = ""
|
||||
if self.tag == ACL_USER:
|
||||
typ = "user"
|
||||
qual = pwd.getpwuid(self.qualifier).pw_name
|
||||
elif self.tag == ACL_GROUP:
|
||||
typ = "group"
|
||||
qual = grp.getgrgid(self.qualifier).gr_name
|
||||
elif self.tag == ACL_USER_OBJ:
|
||||
typ = "user"
|
||||
elif self.tag == ACL_GROUP_OBJ:
|
||||
typ = "group"
|
||||
elif self.tag == ACL_MASK:
|
||||
typ = "mask"
|
||||
elif self.tag == ACL_OTHER:
|
||||
typ = "other"
|
||||
|
||||
r = "r" if self.mode & ACL_READ else "-"
|
||||
w = "w" if self.mode & ACL_WRITE else "-"
|
||||
x = "x" if self.mode & ACL_EXECUTE else "-"
|
||||
|
||||
return f"{typ}:{qual}:{r}{w}{x}"
|
||||
|
||||
|
||||
class ACL(object):
|
||||
def __init__(self, acl):
|
||||
self.acl = acl
|
||||
|
||||
def __del__(self):
|
||||
acl_free(self.acl)
|
||||
|
||||
def entries(self):
|
||||
entry_id = ACL_FIRST_ENTRY
|
||||
while True:
|
||||
entry = acl_get_entry(self.acl, entry_id)
|
||||
if entry is None:
|
||||
break
|
||||
|
||||
permset = acl_get_permset(entry)
|
||||
|
||||
mode = 0
|
||||
for m in (ACL_READ, ACL_WRITE, ACL_EXECUTE):
|
||||
if acl_get_perm(permset, m):
|
||||
mode |= m
|
||||
|
||||
qualifier = None
|
||||
tag = acl_get_tag_type(entry)
|
||||
|
||||
if tag == ACL_USER or tag == ACL_GROUP:
|
||||
qual = acl_get_qualifier(entry)
|
||||
qualifier = ctypes.cast(qual, ctypes.POINTER(ctypes.c_int))[0]
|
||||
|
||||
yield Entry(tag, qualifier, mode)
|
||||
|
||||
entry_id = ACL_NEXT_ENTRY
|
||||
|
||||
@classmethod
|
||||
def from_path(cls, path, typ):
|
||||
acl = acl_get_file(path, typ)
|
||||
return cls(acl)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
import pwd
|
||||
import grp
|
||||
from pathlib import Path
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("path", help="File Path", type=Path)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
acl = ACL.from_path(args.path, ACL_TYPE_ACCESS)
|
||||
for entry in acl.entries():
|
||||
print(str(entry))
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
126
bitbake/lib/bb/xattr.py
Executable file
126
bitbake/lib/bb/xattr.py
Executable file
|
@ -0,0 +1,126 @@
|
|||
#! /usr/bin/env python3
|
||||
#
|
||||
# Copyright 2023 by Garmin Ltd. or its subsidiaries
|
||||
#
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
import sys
|
||||
import ctypes
|
||||
import os
|
||||
import errno
|
||||
|
||||
libc = ctypes.CDLL("libc.so.6", use_errno=True)
|
||||
fsencoding = sys.getfilesystemencoding()
|
||||
|
||||
|
||||
libc.listxattr.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_size_t]
|
||||
libc.llistxattr.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_size_t]
|
||||
|
||||
|
||||
def listxattr(path, follow=True):
|
||||
func = libc.listxattr if follow else libc.llistxattr
|
||||
|
||||
os_path = os.fsencode(path)
|
||||
|
||||
while True:
|
||||
length = func(os_path, None, 0)
|
||||
|
||||
if length < 0:
|
||||
err = ctypes.get_errno()
|
||||
raise OSError(err, os.strerror(err), str(path))
|
||||
|
||||
if length == 0:
|
||||
return []
|
||||
|
||||
arr = ctypes.create_string_buffer(length)
|
||||
|
||||
read_length = func(os_path, arr, length)
|
||||
if read_length != length:
|
||||
# Race!
|
||||
continue
|
||||
|
||||
return [a.decode(fsencoding) for a in arr.raw.split(b"\x00") if a]
|
||||
|
||||
|
||||
libc.getxattr.argtypes = [
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_size_t,
|
||||
]
|
||||
libc.lgetxattr.argtypes = [
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_size_t,
|
||||
]
|
||||
|
||||
|
||||
def getxattr(path, name, follow=True):
|
||||
func = libc.getxattr if follow else libc.lgetxattr
|
||||
|
||||
os_path = os.fsencode(path)
|
||||
os_name = os.fsencode(name)
|
||||
|
||||
while True:
|
||||
length = func(os_path, os_name, None, 0)
|
||||
|
||||
if length < 0:
|
||||
err = ctypes.get_errno()
|
||||
if err == errno.ENODATA:
|
||||
return None
|
||||
raise OSError(err, os.strerror(err), str(path))
|
||||
|
||||
if length == 0:
|
||||
return ""
|
||||
|
||||
arr = ctypes.create_string_buffer(length)
|
||||
|
||||
read_length = func(os_path, os_name, arr, length)
|
||||
if read_length != length:
|
||||
# Race!
|
||||
continue
|
||||
|
||||
return arr.raw
|
||||
|
||||
|
||||
def get_all_xattr(path, follow=True):
|
||||
attrs = {}
|
||||
|
||||
names = listxattr(path, follow)
|
||||
|
||||
for name in names:
|
||||
value = getxattr(path, name, follow)
|
||||
if value is None:
|
||||
# This can happen if a value is erased after listxattr is called,
|
||||
# so ignore it
|
||||
continue
|
||||
attrs[name] = value
|
||||
|
||||
return attrs
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("path", help="File Path", type=Path)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
attrs = get_all_xattr(args.path)
|
||||
|
||||
for name, value in attrs.items():
|
||||
try:
|
||||
value = value.decode(fsencoding)
|
||||
except UnicodeDecodeError:
|
||||
pass
|
||||
|
||||
print(f"{name} = {value}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
Loading…
Reference in New Issue
Block a user