ANDROID: rust_binder: add work lists

The binder driver uses linked lists of work items to store events that
need to be delivered to userspace. There are work lists on both the
process and threads.

Work items are expected to implement the `DeliverToRead` trait, whose
name signifies that this type is something that can be delivered to
userspace via the read part of the `BINDER_WRITE_READ` ioctl. The trait
defines what happens when a work item is executed, when it is cancelled,
how the thread should be notified (`wake_up_interruptible_sync` or
`wake_up_interruptible`?), and how it can be enqueued to a linked list.
For each type that implements the trait, Rust will generate a vtable
for the type. Pointers to the `dyn DeliverToRead` type will be fat
pointers where the metadata of the pointer is a pointer to the vtable.

We introduce the concept of a "ready thread". This is a thread that is
currently waiting for work items inside the `get_work` method. The
process will keep track of them and deliver new work items to one of the
ready threads directly. When there are no ready threads, work items are
stored in the process work list. Note that not all work items are
delivered using the "ready thread" functionality. For example, in a
future patch, transactions that are already part of a transaction stack
are delivered directly to the appropriate thread instead.

The work lists added in this patch are not used yet, so the `push_work`
methods are marked with `#[allow(dead_code)]` to silence the warnings
about unused methods. A user is added in the next patch of this patch
set.

Link: https://lore.kernel.org/rust-for-linux/20231101-rust-binder-v1-4-08ba9197f637@google.com/
Change-Id: I4a7c2db2a9b49996d50ca9544e8eb187d45f0566
Co-developed-by: Wedson Almeida Filho <wedsonaf@gmail.com>
Signed-off-by: Wedson Almeida Filho <wedsonaf@gmail.com>
Signed-off-by: Alice Ryhl <aliceryhl@google.com>
Bug: 278052745
This commit is contained in:
Alice Ryhl 2023-06-07 11:15:15 +00:00
parent 046eb3d735
commit 6feafb413a
6 changed files with 497 additions and 21 deletions

View File

@ -13,6 +13,15 @@ pub(crate) struct BinderError {
source: Option<Error>, source: Option<Error>,
} }
impl BinderError {
pub(crate) fn new_dead() -> Self {
Self {
reply: BR_DEAD_REPLY,
source: None,
}
}
}
/// Convert an errno into a `BinderError` and store the errno used to construct it. The errno /// Convert an errno into a `BinderError` and store the errno used to construct it. The errno
/// should be stored as the thread's extended error when given to userspace. /// should be stored as the thread's extended error when given to userspace.
impl From<Error> for BinderError { impl From<Error> for BinderError {

View File

@ -16,19 +16,25 @@ use kernel::{
bindings, bindings,
cred::Credential, cred::Credential,
file::{self, File}, file::{self, File},
list::{HasListLinks, ListArc, ListArcSafe, ListItem, ListLinks}, list::{HasListLinks, List, ListArc, ListArcSafe, ListItem, ListLinks},
mm, mm,
prelude::*, prelude::*,
rbtree::{self, RBTree}, rbtree::{self, RBTree},
sync::poll::PollTable, sync::poll::PollTable,
sync::{Arc, ArcBorrow, SpinLock}, sync::{lock::Guard, Arc, ArcBorrow, SpinLock},
task::Task, task::Task,
types::ARef, types::{ARef, Either},
uaccess::{UserSlice, UserSliceReader}, uaccess::{UserSlice, UserSliceReader},
workqueue::{self, Work}, workqueue::{self, Work},
}; };
use crate::{context::Context, defs::*, thread::Thread}; use crate::{
context::Context,
defs::*,
error::BinderError,
thread::{PushWorkRes, Thread},
DLArc, DTRWrap, DeliverToRead,
};
use core::mem::take; use core::mem::take;
@ -37,8 +43,11 @@ const PROC_DEFER_RELEASE: u8 = 2;
/// The fields of `Process` protected by the spinlock. /// The fields of `Process` protected by the spinlock.
pub(crate) struct ProcessInner { pub(crate) struct ProcessInner {
is_dead: bool, pub(crate) is_dead: bool,
threads: RBTree<i32, Arc<Thread>>, threads: RBTree<i32, Arc<Thread>>,
/// INVARIANT: Threads pushed to this list must be owned by this process.
ready_threads: List<Thread>,
work: List<DTRWrap<dyn DeliverToRead>>,
/// The number of requested threads that haven't registered yet. /// The number of requested threads that haven't registered yet.
requested_thread_count: u32, requested_thread_count: u32,
@ -56,6 +65,8 @@ impl ProcessInner {
Self { Self {
is_dead: false, is_dead: false,
threads: RBTree::new(), threads: RBTree::new(),
ready_threads: List::new(),
work: List::new(),
requested_thread_count: 0, requested_thread_count: 0,
max_threads: 0, max_threads: 0,
started_thread_count: 0, started_thread_count: 0,
@ -63,6 +74,37 @@ impl ProcessInner {
} }
} }
/// Schedule the work item for execution on this process.
///
/// If any threads are ready for work, then the work item is given directly to that thread and
/// it is woken up. Otherwise, it is pushed to the process work list.
///
/// This call can fail only if the process is dead. In this case, the work item is returned to
/// the caller so that the caller can drop it after releasing the inner process lock. This is
/// necessary since the destructor of `Transaction` will take locks that can't necessarily be
/// taken while holding the inner process lock.
#[allow(dead_code)]
pub(crate) fn push_work(
&mut self,
work: DLArc<dyn DeliverToRead>,
) -> Result<(), (BinderError, DLArc<dyn DeliverToRead>)> {
// Try to find a ready thread to which to push the work.
if let Some(thread) = self.ready_threads.pop_front() {
// Push to thread while holding state lock. This prevents the thread from giving up
// (for example, because of a signal) when we're about to deliver work.
match thread.push_work(work) {
PushWorkRes::Ok => Ok(()),
PushWorkRes::FailedDead(work) => Err((BinderError::new_dead(), work)),
}
} else if self.is_dead {
Err((BinderError::new_dead(), work))
} else {
// There are no ready threads. Push work to process queue.
self.work.push_back(work);
Ok(())
}
}
fn register_thread(&mut self) -> bool { fn register_thread(&mut self) -> bool {
if self.requested_thread_count == 0 { if self.requested_thread_count == 0 {
return false; return false;
@ -154,6 +196,31 @@ impl Process {
Ok(process) Ok(process)
} }
/// Attempts to fetch a work item from the process queue.
pub(crate) fn get_work(&self) -> Option<DLArc<dyn DeliverToRead>> {
self.inner.lock().work.pop_front()
}
/// Attempts to fetch a work item from the process queue. If none is available, it registers the
/// given thread as ready to receive work directly.
///
/// This must only be called when the thread is not participating in a transaction chain; when
/// it is, work will always be delivered directly to the thread (and not through the process
/// queue).
pub(crate) fn get_work_or_register<'a>(
&'a self,
thread: &'a Arc<Thread>,
) -> Either<DLArc<dyn DeliverToRead>, Registration<'a>> {
let mut inner = self.inner.lock();
// Try to get work from the process queue.
if let Some(work) = inner.work.pop_front() {
return Either::Left(work);
}
// Register the thread as ready.
Either::Right(Registration::new(thread, &mut inner))
}
fn get_current_thread(self: ArcBorrow<'_, Self>) -> Result<Arc<Thread>> { fn get_current_thread(self: ArcBorrow<'_, Self>) -> Result<Arc<Thread>> {
let id = { let id = {
let current = kernel::current!(); let current = kernel::current!();
@ -207,8 +274,9 @@ impl Process {
pub(crate) fn needs_thread(&self) -> bool { pub(crate) fn needs_thread(&self) -> bool {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let ret = let ret = inner.requested_thread_count == 0
inner.requested_thread_count == 0 && inner.started_thread_count < inner.max_threads; && inner.ready_threads.is_empty()
&& inner.started_thread_count < inner.max_threads;
if ret { if ret {
inner.requested_thread_count += 1 inner.requested_thread_count += 1
} }
@ -216,7 +284,10 @@ impl Process {
} }
fn deferred_flush(&self) { fn deferred_flush(&self) {
// NOOP for now. let inner = self.inner.lock();
for thread in inner.threads.values() {
thread.exit_looper();
}
} }
fn deferred_release(self: Arc<Self>) { fn deferred_release(self: Arc<Self>) {
@ -224,6 +295,11 @@ impl Process {
self.ctx.deregister_process(&self); self.ctx.deregister_process(&self);
// Cancel all pending work items.
while let Some(work) = self.get_work() {
work.into_arc().cancel();
}
// Move the threads out of `inner` so that we can iterate over them without holding the // Move the threads out of `inner` so that we can iterate over them without holding the
// lock. // lock.
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
@ -360,3 +436,39 @@ impl Process {
Err(EINVAL) Err(EINVAL)
} }
} }
/// Represents that a thread has registered with the `ready_threads` list of its process.
///
/// The destructor of this type will unregister the thread from the list of ready threads.
pub(crate) struct Registration<'a> {
thread: &'a Arc<Thread>,
}
impl<'a> Registration<'a> {
fn new(
thread: &'a Arc<Thread>,
guard: &mut Guard<'_, ProcessInner, kernel::sync::lock::spinlock::SpinLockBackend>,
) -> Self {
assert!(core::ptr::eq(&thread.process.inner, guard.lock()));
// INVARIANT: We are pushing this thread to the right `ready_threads` list.
if let Ok(list_arc) = ListArc::try_from_arc(thread.clone()) {
guard.ready_threads.push_front(list_arc);
} else {
// It is an error to hit this branch, and it should not be reachable. We try to do
// something reasonable when the failure path happens. Most likely, the thread in
// question will sleep forever.
pr_err!("Same thread registered with `ready_threads` twice.");
}
Self { thread }
}
}
impl Drop for Registration<'_> {
fn drop(&mut self) {
let mut inner = self.thread.process.inner.lock();
// SAFETY: The thread has the invariant that we never push it to any other linked list than
// the `ready_threads` list of its parent process. Therefore, the thread is either in that
// list, or in no list.
unsafe { inner.ready_threads.remove(self.thread) };
}
}

View File

@ -7,13 +7,17 @@
use kernel::{ use kernel::{
bindings::{self, seq_file}, bindings::{self, seq_file},
file::File, file::File,
list::{
HasListLinks, ListArc, ListArcSafe, ListItem, ListLinks, ListLinksSelfPtr, TryNewListArc,
},
prelude::*, prelude::*,
sync::poll::PollTable, sync::poll::PollTable,
sync::Arc, sync::Arc,
types::ForeignOwnable, types::ForeignOwnable,
uaccess::UserSliceWriter,
}; };
use crate::{context::Context, process::Process}; use crate::{context::Context, process::Process, thread::Thread};
mod context; mod context;
mod defs; mod defs;
@ -29,6 +33,87 @@ module! {
license: "GPL", license: "GPL",
} }
/// Specifies how a type should be delivered to the read part of a BINDER_WRITE_READ ioctl.
///
/// When a value is pushed to the todo list for a process or thread, it is stored as a trait object
/// with the type `Arc<dyn DeliverToRead>`. Trait objects are a Rust feature that lets you
/// implement dynamic dispatch over many different types. This lets us store many different types
/// in the todo list.
trait DeliverToRead: ListArcSafe + Send + Sync {
/// Performs work. Returns true if remaining work items in the queue should be processed
/// immediately, or false if it should return to caller before processing additional work
/// items.
fn do_work(self: DArc<Self>, thread: &Thread, writer: &mut UserSliceWriter) -> Result<bool>;
/// Cancels the given work item. This is called instead of [`DeliverToRead::do_work`] when work
/// won't be delivered.
fn cancel(self: DArc<Self>) {}
/// Should we use `wake_up_interruptible_sync` or `wake_up_interruptible` when scheduling this
/// work item?
///
/// Generally only set to true for non-oneway transactions.
fn should_sync_wakeup(&self) -> bool;
/// Get the debug name of this type.
fn debug_name(&self) -> &'static str {
core::any::type_name::<Self>()
}
}
// Wrapper around a `DeliverToRead` with linked list links.
#[pin_data]
struct DTRWrap<T: ?Sized> {
#[pin]
links: ListLinksSelfPtr<DTRWrap<dyn DeliverToRead>>,
#[pin]
wrapped: T,
}
kernel::list::impl_has_list_links_self_ptr! {
impl HasSelfPtr<DTRWrap<dyn DeliverToRead>> for DTRWrap<dyn DeliverToRead> { self.links }
}
kernel::list::impl_list_arc_safe! {
impl{T: ListArcSafe + ?Sized} ListArcSafe<0> for DTRWrap<T> {
tracked_by wrapped: T;
}
}
kernel::list::impl_list_item! {
impl ListItem<0> for DTRWrap<dyn DeliverToRead> {
using ListLinksSelfPtr;
}
}
impl<T: ?Sized> core::ops::Deref for DTRWrap<T> {
type Target = T;
fn deref(&self) -> &T {
&self.wrapped
}
}
impl<T: ?Sized> core::ops::Receiver for DTRWrap<T> {}
type DArc<T> = kernel::sync::Arc<DTRWrap<T>>;
type DLArc<T> = kernel::list::ListArc<DTRWrap<T>>;
impl<T: ListArcSafe> DTRWrap<T> {
#[allow(dead_code)]
fn arc_try_new(val: T) -> Result<DLArc<T>, alloc::alloc::AllocError> {
ListArc::pin_init(pin_init!(Self {
links <- ListLinksSelfPtr::new(),
wrapped: val,
}))
.map_err(|_| alloc::alloc::AllocError)
}
#[allow(dead_code)]
fn arc_pin_init(init: impl PinInit<T>) -> Result<DLArc<T>, kernel::error::Error> {
ListArc::pin_init(pin_init!(Self {
links <- ListLinksSelfPtr::new(),
wrapped <- init,
}))
}
}
struct BinderModule {} struct BinderModule {}
impl kernel::Module for BinderModule { impl kernel::Module for BinderModule {

View File

@ -9,24 +9,51 @@
use kernel::{ use kernel::{
bindings, bindings,
list::{
AtomicListArcTracker, HasListLinks, List, ListArcSafe, ListItem, ListLinks, TryNewListArc,
},
prelude::*, prelude::*,
sync::{Arc, SpinLock}, sync::{Arc, CondVar, SpinLock},
types::Either,
uaccess::UserSlice, uaccess::UserSlice,
}; };
use crate::{defs::*, process::Process}; use crate::{defs::*, process::Process, DLArc, DTRWrap, DeliverToRead};
use core::mem::size_of; use core::mem::size_of;
pub(crate) enum PushWorkRes {
Ok,
FailedDead(DLArc<dyn DeliverToRead>),
}
impl PushWorkRes {
fn is_ok(&self) -> bool {
match self {
PushWorkRes::Ok => true,
PushWorkRes::FailedDead(_) => false,
}
}
}
/// The fields of `Thread` protected by the spinlock. /// The fields of `Thread` protected by the spinlock.
struct InnerThread { struct InnerThread {
/// Determines the looper state of the thread. It is a bit-wise combination of the constants /// Determines the looper state of the thread. It is a bit-wise combination of the constants
/// prefixed with `LOOPER_`. /// prefixed with `LOOPER_`.
looper_flags: u32, looper_flags: u32,
/// Determines whether the looper should return.
looper_need_return: bool,
/// Determines if thread is dead. /// Determines if thread is dead.
is_dead: bool, is_dead: bool,
/// Determines whether the work list below should be processed. When set to false, `work_list`
/// is treated as if it were empty.
process_work_list: bool,
/// List of work items to deliver to userspace.
work_list: List<DTRWrap<dyn DeliverToRead>>,
/// Extended error information for this thread. /// Extended error information for this thread.
extended_error: ExtendedError, extended_error: ExtendedError,
} }
@ -35,6 +62,8 @@ const LOOPER_REGISTERED: u32 = 0x01;
const LOOPER_ENTERED: u32 = 0x02; const LOOPER_ENTERED: u32 = 0x02;
const LOOPER_EXITED: u32 = 0x04; const LOOPER_EXITED: u32 = 0x04;
const LOOPER_INVALID: u32 = 0x08; const LOOPER_INVALID: u32 = 0x08;
const LOOPER_WAITING: u32 = 0x10;
const LOOPER_WAITING_PROC: u32 = 0x20;
impl InnerThread { impl InnerThread {
fn new() -> Self { fn new() -> Self {
@ -47,11 +76,42 @@ impl InnerThread {
Self { Self {
looper_flags: 0, looper_flags: 0,
looper_need_return: false,
is_dead: false, is_dead: false,
process_work_list: false,
work_list: List::new(),
extended_error: ExtendedError::new(next_err_id(), BR_OK, 0), extended_error: ExtendedError::new(next_err_id(), BR_OK, 0),
} }
} }
fn pop_work(&mut self) -> Option<DLArc<dyn DeliverToRead>> {
if !self.process_work_list {
return None;
}
let ret = self.work_list.pop_front();
self.process_work_list = !self.work_list.is_empty();
ret
}
#[allow(dead_code)]
fn push_work(&mut self, work: DLArc<dyn DeliverToRead>) -> PushWorkRes {
if self.is_dead {
PushWorkRes::FailedDead(work)
} else {
self.work_list.push_back(work);
self.process_work_list = true;
PushWorkRes::Ok
}
}
/// Used to push work items that do not need to be processed immediately and can wait until the
/// thread gets another work item.
#[allow(dead_code)]
fn push_work_deferred(&mut self, work: DLArc<dyn DeliverToRead>) {
self.work_list.push_back(work);
}
fn looper_enter(&mut self) { fn looper_enter(&mut self) {
self.looper_flags |= LOOPER_ENTERED; self.looper_flags |= LOOPER_ENTERED;
if self.looper_flags & LOOPER_REGISTERED != 0 { if self.looper_flags & LOOPER_REGISTERED != 0 {
@ -74,6 +134,16 @@ impl InnerThread {
fn is_looper(&self) -> bool { fn is_looper(&self) -> bool {
self.looper_flags & (LOOPER_ENTERED | LOOPER_REGISTERED) != 0 self.looper_flags & (LOOPER_ENTERED | LOOPER_REGISTERED) != 0
} }
/// Determines whether the thread should attempt to fetch work items from the process queue.
/// This is generally case when the thread is registered as a looper. But if there is local
/// work, we want to return to userspace before we deliver any remote work.
///
/// In future patches, it will also be required that the thread is not part of a transaction
/// stack.
fn should_use_process_work_queue(&self) -> bool {
!self.process_work_list && self.is_looper()
}
} }
/// This represents a thread that's used with binder. /// This represents a thread that's used with binder.
@ -83,6 +153,29 @@ pub(crate) struct Thread {
pub(crate) process: Arc<Process>, pub(crate) process: Arc<Process>,
#[pin] #[pin]
inner: SpinLock<InnerThread>, inner: SpinLock<InnerThread>,
#[pin]
work_condvar: CondVar,
/// Used to insert this thread into the process' `ready_threads` list.
///
/// INVARIANT: May never be used for any other list than the `self.process.ready_threads`.
#[pin]
links: ListLinks,
#[pin]
links_track: AtomicListArcTracker,
}
kernel::list::impl_has_list_links! {
impl HasListLinks<0> for Thread { self.links }
}
kernel::list::impl_list_arc_safe! {
impl ListArcSafe<0> for Thread {
tracked_by links_track: AtomicListArcTracker;
}
}
kernel::list::impl_list_item! {
impl ListItem<0> for Thread {
using ListLinks;
}
} }
impl Thread { impl Thread {
@ -91,6 +184,9 @@ impl Thread {
id, id,
process, process,
inner <- kernel::new_spinlock!(InnerThread::new(), "Thread::inner"), inner <- kernel::new_spinlock!(InnerThread::new(), "Thread::inner"),
work_condvar <- kernel::new_condvar!("Thread::work_condvar"),
links <- ListLinks::new(),
links_track <- AtomicListArcTracker::new(),
})) }))
} }
@ -101,6 +197,122 @@ impl Thread {
Ok(()) Ok(())
} }
/// Attempts to fetch a work item from the thread-local queue. The behaviour if the queue is
/// empty depends on `wait`: if it is true, the function waits for some work to be queued (or a
/// signal); otherwise it returns indicating that none is available.
fn get_work_local(self: &Arc<Self>, wait: bool) -> Result<Option<DLArc<dyn DeliverToRead>>> {
{
let mut inner = self.inner.lock();
if inner.looper_need_return {
return Ok(inner.pop_work());
}
}
// Try once if the caller does not want to wait.
if !wait {
return self.inner.lock().pop_work().ok_or(EAGAIN).map(Some);
}
// Loop waiting only on the local queue (i.e., not registering with the process queue).
let mut inner = self.inner.lock();
loop {
if let Some(work) = inner.pop_work() {
return Ok(Some(work));
}
inner.looper_flags |= LOOPER_WAITING;
let signal_pending = self.work_condvar.wait_interruptible(&mut inner);
inner.looper_flags &= !LOOPER_WAITING;
if signal_pending {
return Err(EINTR);
}
if inner.looper_need_return {
return Ok(None);
}
}
}
/// Attempts to fetch a work item from the thread-local queue, falling back to the process-wide
/// queue if none is available locally.
///
/// This must only be called when the thread is not participating in a transaction chain. If it
/// is, the local version (`get_work_local`) should be used instead.
fn get_work(self: &Arc<Self>, wait: bool) -> Result<Option<DLArc<dyn DeliverToRead>>> {
// Try to get work from the thread's work queue, using only a local lock.
{
let mut inner = self.inner.lock();
if let Some(work) = inner.pop_work() {
return Ok(Some(work));
}
if inner.looper_need_return {
drop(inner);
return Ok(self.process.get_work());
}
}
// If the caller doesn't want to wait, try to grab work from the process queue.
//
// We know nothing will have been queued directly to the thread queue because it is not in
// a transaction and it is not in the process' ready list.
if !wait {
return self.process.get_work().ok_or(EAGAIN).map(Some);
}
// Get work from the process queue. If none is available, atomically register as ready.
let reg = match self.process.get_work_or_register(self) {
Either::Left(work) => return Ok(Some(work)),
Either::Right(reg) => reg,
};
let mut inner = self.inner.lock();
loop {
if let Some(work) = inner.pop_work() {
return Ok(Some(work));
}
inner.looper_flags |= LOOPER_WAITING | LOOPER_WAITING_PROC;
let signal_pending = self.work_condvar.wait_interruptible(&mut inner);
inner.looper_flags &= !(LOOPER_WAITING | LOOPER_WAITING_PROC);
if signal_pending || inner.looper_need_return {
// We need to return now. We need to pull the thread off the list of ready threads
// (by dropping `reg`), then check the state again after it's off the list to
// ensure that something was not queued in the meantime. If something has been
// queued, we just return it (instead of the error).
drop(inner);
drop(reg);
let res = match self.inner.lock().pop_work() {
Some(work) => Ok(Some(work)),
None if signal_pending => Err(EINTR),
None => Ok(None),
};
return res;
}
}
}
/// Push the provided work item to be delivered to user space via this thread.
///
/// Returns whether the item was successfully pushed. This can only fail if the thread is dead.
#[allow(dead_code)]
pub(crate) fn push_work(&self, work: DLArc<dyn DeliverToRead>) -> PushWorkRes {
let sync = work.should_sync_wakeup();
let res = self.inner.lock().push_work(work);
if res.is_ok() {
if sync {
self.work_condvar.notify_sync();
} else {
self.work_condvar.notify_one();
}
}
res
}
fn write(self: &Arc<Self>, req: &mut BinderWriteRead) -> Result { fn write(self: &Arc<Self>, req: &mut BinderWriteRead) -> Result {
let write_start = req.write_buffer.wrapping_add(req.write_consumed); let write_start = req.write_buffer.wrapping_add(req.write_consumed);
let write_len = req.write_size - req.write_consumed; let write_len = req.write_size - req.write_consumed;
@ -128,11 +340,19 @@ impl Thread {
Ok(()) Ok(())
} }
fn read(self: &Arc<Self>, req: &mut BinderWriteRead, _wait: bool) -> Result { fn read(self: &Arc<Self>, req: &mut BinderWriteRead, wait: bool) -> Result {
let read_start = req.read_buffer.wrapping_add(req.read_consumed); let read_start = req.read_buffer.wrapping_add(req.read_consumed);
let read_len = req.read_size - req.read_consumed; let read_len = req.read_size - req.read_consumed;
let mut writer = UserSlice::new(read_start as _, read_len as _).writer(); let mut writer = UserSlice::new(read_start as _, read_len as _).writer();
let in_pool = self.inner.lock().is_looper(); let (in_pool, use_proc_queue) = {
let inner = self.inner.lock();
(inner.is_looper(), inner.should_use_process_work_queue())
};
let getter = if use_proc_queue {
Self::get_work
} else {
Self::get_work_local
};
// Reserve some room at the beginning of the read buffer so that we can send a // Reserve some room at the beginning of the read buffer so that we can send a
// BR_SPAWN_LOOPER if we need to. // BR_SPAWN_LOOPER if we need to.
@ -146,13 +366,35 @@ impl Thread {
} }
// Loop doing work while there is room in the buffer. // Loop doing work while there is room in the buffer.
#[allow(clippy::never_loop)] let initial_len = writer.len();
while writer.len() >= size_of::<bindings::binder_transaction_data_secctx>() + 4 { while writer.len() >= size_of::<bindings::binder_transaction_data_secctx>() + 4 {
// There is enough space in the output buffer to process another work item. match getter(self, wait && initial_len == writer.len()) {
// Ok(Some(work)) => {
// However, we have not yet added work items to the driver, so we immediately break let work_ty = work.debug_name();
// from the loop. match work.into_arc().do_work(self, &mut writer) {
break; Ok(true) => {}
Ok(false) => break,
Err(err) => {
pr_warn!("Failure inside do_work of type {}.", work_ty);
return Err(err);
}
}
}
Ok(None) => {
break;
}
Err(err) => {
// Propagate the error if we haven't written anything else.
if err != EINTR && err != EAGAIN {
pr_warn!("Failure in work getter: {:?}", err);
}
if initial_len == writer.len() {
return Err(err);
} else {
break;
}
}
}
} }
req.read_consumed += read_len - writer.len() as u64; req.read_consumed += read_len - writer.len() as u64;
@ -179,6 +421,7 @@ impl Thread {
); );
req.read_consumed = 0; req.read_consumed = 0;
writer.write(&req)?; writer.write(&req)?;
self.inner.lock().looper_need_return = false;
return Err(err); return Err(err);
} }
} }
@ -198,10 +441,32 @@ impl Thread {
// Write the request back so that the consumed fields are visible to the caller. // Write the request back so that the consumed fields are visible to the caller.
writer.write(&req)?; writer.write(&req)?;
self.inner.lock().looper_need_return = false;
ret ret
} }
/// Make the call to `get_work` or `get_work_local` return immediately, if any.
pub(crate) fn exit_looper(&self) {
let mut inner = self.inner.lock();
let should_notify = inner.looper_flags & LOOPER_WAITING != 0;
if should_notify {
inner.looper_need_return = true;
}
drop(inner);
if should_notify {
self.work_condvar.notify_one();
}
}
pub(crate) fn release(self: &Arc<Self>) { pub(crate) fn release(self: &Arc<Self>) {
self.inner.lock().is_dead = true; self.inner.lock().is_dead = true;
// Cancel all pending work items.
while let Ok(Some(work)) = self.get_work_local(false) {
work.into_arc().cancel();
}
} }
} }

View File

@ -144,7 +144,12 @@ pub struct Guard<'a, T: ?Sized, B: Backend> {
// SAFETY: `Guard` is sync when the data protected by the lock is also sync. // SAFETY: `Guard` is sync when the data protected by the lock is also sync.
unsafe impl<T: Sync + ?Sized, B: Backend> Sync for Guard<'_, T, B> {} unsafe impl<T: Sync + ?Sized, B: Backend> Sync for Guard<'_, T, B> {}
impl<T: ?Sized, B: Backend> Guard<'_, T, B> { impl<'a, T: ?Sized, B: Backend> Guard<'a, T, B> {
/// Returns the lock that this guard originates from.
pub fn lock(&self) -> &'a Lock<T, B> {
self.lock
}
pub(crate) fn do_unlocked<U>(&mut self, cb: impl FnOnce() -> U) -> U { pub(crate) fn do_unlocked<U>(&mut self, cb: impl FnOnce() -> U) -> U {
// SAFETY: The caller owns the lock, so it is safe to unlock it. // SAFETY: The caller owns the lock, so it is safe to unlock it.
unsafe { B::unlock(self.lock.state.get(), &self.state) }; unsafe { B::unlock(self.lock.state.get(), &self.state) };

View File

@ -262,7 +262,7 @@ $(obj)/%.lst: $(src)/%.c FORCE
# Compile Rust sources (.rs) # Compile Rust sources (.rs)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
rust_allowed_features := const_maybe_uninit_zeroed,new_uninit,offset_of,allocator_api rust_allowed_features := const_maybe_uninit_zeroed,new_uninit,offset_of,allocator_api,receiver_trait
# `--out-dir` is required to avoid temporaries being created by `rustc` in the # `--out-dir` is required to avoid temporaries being created by `rustc` in the
# current working directory, which may be not accessible in the out-of-tree # current working directory, which may be not accessible in the out-of-tree