Skip to content

Commit

Permalink
Add tokio03
Browse files Browse the repository at this point in the history
  • Loading branch information
DoumanAsh committed Oct 29, 2020
1 parent 48ae3a8 commit b0bbe9e
Show file tree
Hide file tree
Showing 10 changed files with 299 additions and 11 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ jobs:
- name: Test with tokio 0.2
if: runner.os != 'Windows'
run: cargo test --features tokio02
run: cargo test --features tokio02,stream

- name: Test with tokio 0.3
if: runner.os != 'Windows'
run: cargo test --features tokio03 -- --nocapture

- name: Test
run: cargo test --all --features std,stream
Expand Down
10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ std = []
# Enables C API wrapper for platform code.
c_wrapper = ["cc"]
# Enables usage of tokio 0.2
tokio02 = ["mio", "tokio", "nix", "std"]
tokio02 = ["mio", "tokio_02", "nix", "std"]
# Enables usage of tokio 0.3
tokio03 = ["tokio_03", "nix", "std"]
# Enables Stream implementation
stream = ["futures-core"]

Expand All @@ -46,7 +48,8 @@ features = ["threadpoolapiset"]
[target.'cfg(any(target_os = "macos", target_os = "ios", unix))'.dependencies]
libc = { version = "0.2.60", default-features = false }
mio = { version = "0.6", optional = true }
tokio = { version = "0.2", default-features = false, optional = true, features = ["io-driver"] }
tokio_02 = { package = "tokio", version = "0.2", default-features = false, optional = true, features = ["io-driver"] }
tokio_03 = { package = "tokio", version = "0.3.2", default-features = false, optional = true, features = ["net"] }

#kqueue needs nix
[target.'cfg(any(target_os = "bitrig", target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos", target_os = "netbsd", target_os = "openbsd"))'.dependencies]
Expand All @@ -57,7 +60,8 @@ wasm-bindgen = "0.2"

[dev-dependencies]
futures-util = "0.3.0"
tokio = { version = "0.2", features = ["macros", "rt-core"] }
tokio_02 = { package = "tokio", version = "0.2", features = ["macros", "rt-core"] }
tokio_03 = { package = "tokio", version = "0.3.2", default-features = false, features = ["macros", "rt"] }

[build-dependencies.cc]
version = "1"
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
//!
//! ## Features
//!
//! - `tokio02` - Enables event loop based timers using tokio 0.2, providing higher accuracy than regular callback based timers on Linux/BSD/Apple platforms
//! - `tokio02` - Enables event loop based timers using tokio 0.2, providing higher accuracy than regular callback based timers on Linux/BSD/Apple platforms.
//! - `tokio03` - Enables event loop based timers using tokio 0.3, providing higher resolution timer as `tokio02`.
//! - `c_wrapper` - Uses C shim to create bindings to platform API, which may be more reliable than `libc`.
#![warn(missing_docs)]

Expand Down
259 changes: 259 additions & 0 deletions src/timer/async_tokio03.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
use tokio_03 as tokio;

use tokio::io::unix::AsyncFd;
use libc::{c_int};

use core::{task, time};
use core::pin::Pin;
use core::future::Future;

pub trait TimerFd: crate::std::os::unix::io::AsRawFd + Sync + Send + Unpin {
fn new() -> Self;
fn set(&mut self, time: time::Duration);
fn unset(&mut self);
fn read(&mut self) -> usize;
}

///Wrapper over fd based timer.
pub struct RawTimer(c_int);

impl crate::std::os::unix::io::AsRawFd for RawTimer {
#[inline(always)]
fn as_raw_fd(&self) -> c_int {
self.0
}
}

#[cfg(target_os = "android")]
mod sys {
#[repr(C)]
pub struct itimerspec {
pub it_interval: libc::timespec,
pub it_value: libc::timespec,
}

extern "C" {
pub fn timerfd_create(clockid: libc::clockid_t, flags: libc::c_int) -> libc::c_int;
pub fn timerfd_settime(timerid: libc::c_int, flags: libc::c_int, new_value: *const itimerspec, old_value: *mut itimerspec) -> libc::c_int;
}

pub const TFD_NONBLOCK: libc::c_int = libc::O_NONBLOCK;
}

#[cfg(target_os = "linux")]
use libc as sys;

#[cfg(any(target_os = "linux", target_os = "android"))]
impl TimerFd for RawTimer {
fn new() -> Self {
let fd = unsafe { sys::timerfd_create(libc::CLOCK_MONOTONIC, sys::TFD_NONBLOCK) };

os_assert!(fd != -1);
Self(fd)
}

fn set(&mut self, timeout: time::Duration) {
#[cfg(not(target_pointer_width = "64"))]
use core::convert::TryFrom;

let it_value = libc::timespec {
tv_sec: timeout.as_secs() as libc::time_t,
#[cfg(target_pointer_width = "64")]
tv_nsec: libc::suseconds_t::from(timeout.subsec_nanos()),
#[cfg(not(target_pointer_width = "64"))]
tv_nsec: libc::suseconds_t::try_from(timeout.subsec_nanos()).unwrap_or(libc::suseconds_t::max_value()),
};

let timer = sys::itimerspec {
it_interval: unsafe { core::mem::MaybeUninit::zeroed().assume_init() },
it_value,
};

let ret = unsafe { sys::timerfd_settime(self.0, 0, &timer, core::ptr::null_mut()) };
os_assert!(ret != -1);
}

#[inline]
fn unset(&mut self) {
self.set(time::Duration::from_secs(0));
}

fn read(&mut self) -> usize {
let mut read_num = 0u64;
match unsafe { libc::read(self.0, &mut read_num as *mut u64 as *mut _, 8) } {
-1 => {
let error = crate::std::io::Error::last_os_error();
match error.kind() {
crate::std::io::ErrorKind::WouldBlock => 0,
_ => panic!("Unexpected read error: {}", error),
}
}
_ => read_num as usize,
}
}
}

#[cfg(any(target_os = "bitrig", target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos", target_os = "netbsd", target_os = "openbsd"))]
impl TimerFd for RawTimer {
fn new() -> Self {
let fd = nix::sys::event::kqueue().unwrap_or(-1);

//If you hit this, then most likely you run into OS imposed limit on file descriptor number
os_assert!(fd != -1);
Self(fd)
}

fn set(&mut self, time: time::Duration) {
use nix::sys::event::*;

let flags = EventFlag::EV_ADD | EventFlag::EV_ENABLE | EventFlag::EV_ONESHOT;
let mut time = time.as_nanos();
let mut unit = FilterFlag::NOTE_NSECONDS;

if time > isize::max_value() as u128 {
unit = FilterFlag::NOTE_USECONDS;
time /= 1_000;
}
if time > isize::max_value() as u128 {
unit = FilterFlag::empty(); // default is milliseconds
time /= 1_000;
}
if time > isize::max_value() as u128 {
unit = FilterFlag::NOTE_SECONDS;
time /= 1_000;
}

let time = time as isize;
kevent(self.0, &[KEvent::new(1, EventFilter::EVFILT_TIMER, flags, unit, time, 0)], &mut [], 0).expect("To arm timer");
}

fn unset(&mut self) {
use nix::sys::event::*;

let flags = EventFlag::EV_DELETE;
kevent(self.0, &[KEvent::new(1, EventFilter::EVFILT_TIMER, flags, FilterFlag::empty(), 0, 0)], &mut [], 0).expect("To disarm timer");
}

fn read(&mut self) -> usize {
use nix::sys::event::*;

let mut ev = [KEvent::new(0, EventFilter::EVFILT_TIMER, EventFlag::empty(), FilterFlag::empty(), 0, 0)];

kevent(self.0, &[], &mut ev[..], 0).expect("To execute kevent")
}
}

enum State<T> {
Init(time::Duration),
Running(T, bool),
}

///Timer implemented on top of `AsyncFd`
pub struct AsyncTokioTimer<T: TimerFd> {
state: State<AsyncFd<T>>
}

impl AsyncTokioTimer<RawTimer> {
#[inline]
///Creates new instance
pub const fn new(time: time::Duration) -> Self {
Self {
state: State::Init(time),
}
}
}

impl<T: TimerFd> super::Timer for AsyncTokioTimer<T> {
#[inline(always)]
fn new(timeout: time::Duration) -> Self {
assert_time!(timeout);
debug_assert!(timeout.as_millis() <= u32::max_value().into());
Self {
state: State::Init(timeout),
}
}

#[inline]
fn is_ticking(&self) -> bool {
match &self.state {
State::Init(_) => false,
State::Running(_, state) => !*state,
}
}

#[inline]
fn is_expired(&self) -> bool {
match &self.state {
State::Init(_) => false,
State::Running(_, state) => *state
}
}

fn restart(&mut self, new_value: time::Duration) {
assert_time!(new_value);
debug_assert!(new_value.as_millis() <= u32::max_value().into());

match &mut self.state {
State::Init(ref mut timeout) => {
*timeout = new_value;
},
State::Running(ref mut fd, ref mut state) => {
*state = false;
fd.get_mut().set(new_value);
}
}
}

#[inline(always)]
fn restart_ctx(&mut self, new_value: time::Duration, _: &task::Waker) {
self.restart(new_value)
}

fn cancel(&mut self) {
unreachable!();
}
}

impl<T: TimerFd> Future for AsyncTokioTimer<T> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context) -> task::Poll<Self::Output> {
if let State::Init(ref timeout) = &self.state {
let mut fd = AsyncFd::new(T::new()).expect("To create AsyncFd");
fd.get_mut().set(*timeout);
self.state = State::Running(fd, false)
};

if let State::Running(ref mut fd, ref mut state) = &mut self.state {
if *state {
return task::Poll::Ready(());
}

let fd = Pin::new(fd);
match fd.poll_read_ready(ctx) {
task::Poll::Pending => return task::Poll::Pending,
task::Poll::Ready(ready) => {
let mut ready = ready.expect("Unable to read async timer's fd");
//technically we should read first, but we cannot borrow as mut then
ready.clear_ready();

match fd.get_mut().get_mut().read() {
0 => {
*state = false;
return task::Poll::Pending
},
_ => {
*state = true;
return task::Poll::Ready(())
}
}
}
}
} else {
unreach!();
}
}
}

///Timer based on tokio's `AsyncFd`
pub type AsyncTimer = AsyncTokioTimer<RawTimer>;
4 changes: 2 additions & 2 deletions src/timer/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Drop for RawTimer {

enum State {
Init(time::Duration),
Running(tokio::io::PollEvented<RawTimer>, bool),
Running(tokio_02::io::PollEvented<RawTimer>, bool),
}

///Timer based on `kqueue`
Expand Down Expand Up @@ -157,7 +157,7 @@ impl Future for KqueueTimer {
loop {
self.state = match &mut self.state {
State::Init(ref timeout) => {
let fd = tokio::io::PollEvented::new(RawTimer::new()).expect("To create PollEvented");
let fd = tokio_02::io::PollEvented::new(RawTimer::new()).expect("To create PollEvented");
fd.get_ref().set(*timeout);
State::Running(fd, false)
}
Expand Down
12 changes: 10 additions & 2 deletions src/timer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ pub type Platform = win::WinTimer;
///Platform alias to Windows timer
pub type SyncPlatform = win::WinTimer;

#[cfg(all(feature = "tokio03", unix))]
mod async_tokio03;
#[cfg(all(feature = "tokio03", unix))]
pub use async_tokio03::AsyncTimer;
#[cfg(all(feature = "tokio03", unix))]
///Timer based on tokio's `AsyncFd`
pub type Platform = AsyncTimer;

#[cfg(all(feature = "tokio02", any(target_os = "linux", target_os = "android")))]
mod timer_fd;
#[cfg(all(feature = "tokio02", any(target_os = "linux", target_os = "android")))]
Expand All @@ -148,7 +156,7 @@ pub use timer_fd::TimerFd;
mod posix;
#[cfg(all(unix, not(any(target_os = "macos", target_os = "ios"))))]
pub use posix::PosixTimer;
#[cfg(all(not(feature = "tokio02"), not(any(target_os = "macos", target_os = "ios")), unix))]
#[cfg(all(not(feature = "tokio02"), not(feature = "tokio03"), not(any(target_os = "macos", target_os = "ios")), unix))]
///Platform alias to POSIX timer
pub type Platform = posix::PosixTimer;
#[cfg(all(feature = "tokio02", any(target_os = "linux", target_os = "android")))]
Expand All @@ -166,7 +174,7 @@ pub use kqueue::KqueueTimer;
mod apple;
#[cfg(any(target_os = "macos", target_os = "ios"))]
pub use apple::AppleTimer;
#[cfg(all(not(feature = "tokio02"), any(target_os = "macos", target_os = "ios")))]
#[cfg(all(not(feature = "tokio02"), not(feature = "tokio03"), any(target_os = "macos", target_os = "ios")))]
///Platform alias to Apple Dispatch timer
pub type Platform = apple::AppleTimer;
#[cfg(all(feature = "tokio02", any(target_os = "bitrig", target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos", target_os = "netbsd", target_os = "openbsd")))]
Expand Down
4 changes: 2 additions & 2 deletions src/timer/timer_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ fn set_timer_value(fd: &RawTimer, timeout: time::Duration) {

enum State {
Init(time::Duration),
Running(tokio::io::PollEvented<RawTimer>, bool),
Running(tokio_02::io::PollEvented<RawTimer>, bool),
}

///Linux `timerfd` wrapper
Expand Down Expand Up @@ -178,7 +178,7 @@ impl Future for TimerFd {
loop {
self.state = match &mut self.state {
State::Init(ref timeout) => {
let fd = tokio::io::PollEvented::new(RawTimer::new()).expect("To create PollEvented");
let fd = tokio_02::io::PollEvented::new(RawTimer::new()).expect("To create PollEvented");
set_timer_value(fd.get_ref(), *timeout);
State::Running(fd, false)
}
Expand Down
4 changes: 4 additions & 0 deletions tests/interval.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use async_timer::{Interval};
#[cfg(feature = "tokio02")]
use tokio_02 as tokio;
#[cfg(not(feature = "tokio02"))]
use tokio_03 as tokio;

use std::time;

Expand Down
Loading

0 comments on commit b0bbe9e

Please sign in to comment.