advance/async/future-excuting #811
Replies: 28 comments 35 replies
-
目前 Rust 缺少 Quartz 一样的大杀器,结合 Future 和 执行器可以搞一个,coding 中 |
Beta Was this translation helpful? Give feedback.
-
大佬帮忙过来看看,下面我对文中的这段代码的注释加了我的理解,请帮忙看看有没有问题: impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() { // 如果发送端还在,且ready_queue里没有任务,主线程就在此阻塞
// 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// 基于任务自身创建一个 `LocalWaker`
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名
// 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
// future是spawner.spawn的整个async块。第一次poll推动执行了块内await前的同步代码,即打印出"任务开始"
// 同步代码执行完TimerFuture::new后返回一个类型为TimerFuture的future,
// 之后的.await驱动执行TimerFuture的poll方法,顺便通过TimerFuture的poll方法把下面future.as_mut().poll(context)传入的context(包含了waker)接续传入TimerFuture内部
// 第一次对TimerFuture做poll返回了Poll::Pending,也就是在.await处返回Poll::Pending
// 那次轮执行外层task就在.await处保存好整个外层task的栈现场(task内.await后的代码不再执行),然后让出主线程的控制权.
// 主线程不可能被挂起,所以继续执行后续同步代码。
// 因为此轮while循环外层future的第一次poll返回了Poll::Pending,所以继续执行if块内部代码
if future.as_mut().poll(context).is_pending() {
// Future还没执行完,因此将它放回任务中,等待下次被poll
// 之所以再放回,是因为前面用了take()。但如果前面不用take(),这里也不用放回了,是否可行?
*future_slot = Some(future);
}
}
}
}
}
fn main() {
let (executor,spawner) = new_executor_and_spawner();
// 将外层task发送到 executor 和 spawner 所在的同步通道中
spawner.spawn(async {
println!("howdy!");
//等待TimerFuture睡醒后执行外层task的wake_by_ref将此外层task再次发送到此同步通道中
TimerFuture::new(Duration::from_secs(2)).await;
println!("done!");
});
drop(spawner); // 因为spawn里复制了一份发送端给外层task,所以spawner被删除后,上面建立的同步通道仍在,下面一句仍能正常执行
executor.run();
} |
Beta Was this translation helpful? Give feedback.
-
有一点不太明白, |
Beta Was this translation helpful? Give feedback.
-
将
改为
是不是更好理解 async .await,不然让他们在包裹了一层就很困惑, 这个拆解完语法糖怎么还需要语法糖 |
Beta Was this translation helpful? Give feedback.
-
我有一个问题,那如果我们自己写的 async 函数就是纯计算函数(不涉及系统 IO,极端点,一个死循环的 for,什么都不干),那是不是这个函数除非运行完,否则完全不能被调度(暂停)? |
Beta Was this translation helpful? Give feedback.
-
想问下各位大佬,在 Executor.run( ) 中 while let Ok(task) = self.ready_queue.recv() { 这个 task 在 while 的第一次循环后,ready_queue 就应该是空的了,那 task 不是应该就 drop 了吗,这个 task 怎么活到了 waker.wake() ? 我特地还把 TimerFuture::new( ) 中 创建新线程:
这段内容全删了,发现程序第二次运行到 Executor:: run ( ) 的 while let Ok(task) = self.ready_queue.recv() { ... } 的时候就 block 住了,而且发现 task 也没 drop ,这是为什么啊? 难道还有别的线程保存了 task ? |
Beta Was this translation helpful? Give feedback.
-
impl Executor { |
Beta Was this translation helpful? Give feedback.
-
PIN的理解还是有点困难,pin的目的说是可以让地址不发生变化,但是什么情况下一个对象的地址会发生变化呢?就像是vector那样重新分配了地址空间的那种? c/c++程序员一时没回过神来... |
Beta Was this translation helpful? Give feedback.
-
这章写得太好了。不仅学到了关于 rust 的异步的实现原理,还通过这个大致推测出 C++ asio 库的实现原理,以及为何其中任何一个异步函数都需要一个 io_context 作为参数传入。之前只会照猫画虎,现在算是彻底明白了,知其然,知其所以然。 |
Beta Was this translation helpful? Give feedback.
-
有跟我一样看懵的吗 |
Beta Was this translation helpful? Give feedback.
-
有一个问题,既然前面说 Future 是惰性的,只在 poll 后才会运行,那么这里的 TimerFuture 也不应该在 new的时候就生成 sleep的工作线程,而应该在 poll 的时候生成线程。🤔 |
Beta Was this translation helpful? Give feedback.
-
纠错: self.task_sender.send(task).expect("任务队列已满"); 仅当 |
Beta Was this translation helpful? Give feedback.
-
thread::spawn(move || { 关于这个TimerFuture请教一个问题,这里为什么在new方法中,要去启动一个线程? 我理解所有的异步任务是实现一个线程内调度多个Future来轮流执行,请问这么理解是否正确?如果这里启动一个线程,相当于一个线程内只有一个Future了,这个是不是反而性能更差了,这样的做法是不是还不如直接多线程? 当然可能这里只是一个例子而已,麻烦楼主回复前边的理解是否是准确的,谢谢! |
Beta Was this translation helpful? Give feedback.
-
spawner.spawn 这个方法, 获取 self 的所有权, main 函数中就不需要 drop 了 |
Beta Was this translation helpful? Give feedback.
-
看过 JYY 老师的视频, 再来看 rust 的线程调度, 别有一番感悟 |
Beta Was this translation helpful? Give feedback.
-
提个建议,由于async其实存在大量类似callback的情况,所以,在讲解的时候最好明确的指出是是谁调用谁,这样有利于树立清楚整个异步调用的过程。当然,如果配合时序图是最好的。 |
Beta Was this translation helpful? Give feedback.
-
按照时间顺序:
main中的Drop只是因为不会再有第二个任务了,所以Drop掉,不可能再被第二个衍生的Task需要、复制了。 |
Beta Was this translation helpful? Give feedback.
-
怎么感觉看完了之后,Poll的实现还是有一些黑盒?看起来 TimeFuture中 new函数中的那个中的函数 thread::spawn(move || {
println!("Start now; expect before executor");
// 执行休息时间
thread::sleep(duration);
let mut s = mv_shared_state.lock().unwrap();
s.completed = true;
if let Some(wake) = s.waker.take() {
println!("Wake here");
wake.wake()
}
}); 是在 impl Future for TimeFuture {
type Output = ();
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
let mut state = self.share_state.lock().unwrap();
println!("Poll time {}", state.poll_times);
state.poll_times = state.poll_times + 1;
if state.completed {
Poll::Ready(())
} else {
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
} 似乎也看不出为啥thread会在poll被调用的时候才执行? |
Beta Was this translation helpful? Give feedback.
-
放回了就能一直运行,不放回let的时候会failed,只能运行一次
|
Beta Was this translation helpful? Give feedback.
-
我想问一个问题 所以以上例子是基于线程的异步实现吗?那是不是其实就和 thread::spawn 后 用join handle的效果是一样的? |
Beta Was this translation helpful? Give feedback.
-
上面完整的代码可以在哪里找到 |
Beta Was this translation helpful? Give feedback.
-
看不懂,没有例子来说明很难看懂啊 |
Beta Was this translation helpful? Give feedback.
-
pub struct SocketRead<'a> { |
Beta Was this translation helpful? Give feedback.
-
例子一些要点:
|
Beta Was this translation helpful? Give feedback.
-
我有个细节的问题没理解到: 麻烦哪位老师解答一下rust实现这个逻辑的位置呢。 |
Beta Was this translation helpful? Give feedback.
-
用过 C++20 Coroutine 自己封装个 runtime 出来就能看懂这里了,原理都一模一样 |
Beta Was this translation helpful? Give feedback.
-
关于执行器的设计部分,是不是实际执行器中,当一个Task返回是Pengding时候,仍然会将Task移出队列,否则在之后Waker时候,又一次将Task发给队列,就重复发送了,而且执行器的关闭时机也不是判断队列是否为空,在示例代码中,当计时器计时完毕时候又wake进行发送,那么这里是不是将一个Task发给队列发了两遍,求大佬解答 |
Beta Was this translation helpful? Give feedback.
-
感覺要自己call wait() 一點都不方便。。。 |
Beta Was this translation helpful? Give feedback.
-
advance/async/future-excuting
https://course.rs/advance/async/future-excuting.html
Beta Was this translation helpful? Give feedback.
All reactions