Skip to content

Commit

Permalink
Refactoring AtomicWaker
Browse files Browse the repository at this point in the history
  • Loading branch information
DoumanAsh committed Oct 27, 2023
1 parent 2b852c7 commit 18695a3
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions src/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//!State module

use core::task;
use core::{ptr, task};
use core::cell::UnsafeCell;
use core::sync::atomic::{AtomicBool, AtomicU8, Ordering};

Expand Down Expand Up @@ -71,18 +71,19 @@ impl AtomicWaker {
match self.state.compare_exchange(WAITING, REGISTERING, Ordering::Acquire, Ordering::Acquire).unwrap_or_else(|err| err) {
WAITING => {
unsafe {
//unconditionally store since we already have ownership
*self.waker.get() = waker;

let res = self.state.compare_exchange(REGISTERING, WAITING, Ordering::AcqRel, Ordering::Acquire);

match res {
match self.state.compare_exchange(REGISTERING, WAITING, Ordering::AcqRel, Ordering::Acquire) {
Ok(_) => {}
Err(actual) => {
debug_assert_eq!(actual, REGISTERING | WAKING);

(*self.waker.get()).wake_by_ref();
let mut waker = noop::waker();
ptr::swap(self.waker.get(), &mut waker);

self.state.swap(WAITING, Ordering::AcqRel);
waker.wake();
}
}
}
Expand All @@ -98,8 +99,11 @@ impl AtomicWaker {
match self.state.compare_exchange(WAITING, REGISTERING, Ordering::Acquire, Ordering::Acquire).unwrap_or_else(|err| err) {
WAITING => {
unsafe {
// Locked acquired, update the waker cell
*self.waker.get() = waker.clone();
// Lock acquired, update the waker cell
if !(*self.waker.get()).will_wake(waker) {
//Clone new waker if it is definitely not the same as old one
*self.waker.get() = waker.clone();
}

// Release the lock. If the state transitioned to include
// the `WAKING` bit, this means that a wake has been
Expand All @@ -108,9 +112,7 @@ impl AtomicWaker {
//
// Start by assuming that the state is `REGISTERING` as this
// is what we jut set it to.
let res = self.state.compare_exchange(REGISTERING, WAITING, Ordering::AcqRel, Ordering::Acquire);

match res {
match self.state.compare_exchange(REGISTERING, WAITING, Ordering::AcqRel, Ordering::Acquire) {
Ok(_) => {}
Err(actual) => {
// This branch can only be reached if a
Expand All @@ -119,11 +121,12 @@ impl AtomicWaker {
// `WAKING`.
debug_assert_eq!(actual, REGISTERING | WAKING);

// Wake without dropping.
(*self.waker.get()).wake_by_ref();
let mut waker = noop::waker();
ptr::swap(self.waker.get(), &mut waker);

// Just swap, because no one could change state while state == `REGISTERING` | `WAKING`.
self.state.swap(WAITING, Ordering::AcqRel);
waker.wake();
}
}
}
Expand Down Expand Up @@ -156,10 +159,14 @@ impl AtomicWaker {
match self.state.fetch_or(WAKING, Ordering::AcqRel) {
WAITING => {
// The waking lock has been acquired.
unsafe { (*self.waker.get()).wake_by_ref() };
let mut waker = noop::waker();
unsafe {
ptr::swap(self.waker.get(), &mut waker);
}

// Release the lock
self.state.fetch_and(!WAKING, Ordering::Release);
waker.wake();
}
state => {
// There is a concurrent thread currently updating the
Expand Down

0 comments on commit 18695a3

Please sign in to comment.