Simplify run and unfold helpers to build a Subscription

This commit is contained in:
Héctor Ramón Jiménez 2022-01-17 15:29:41 +07:00
parent 88f1168a0b
commit ddbbe7353b
No known key found for this signature in database
GPG key ID: 140CC052C94F138E
2 changed files with 69 additions and 96 deletions

View file

@ -1,22 +1,15 @@
use futures::Stream;
use iced_futures::futures;
use iced_native::subscription; use iced_native::subscription;
use std::hash::Hash; use std::hash::Hash;
// Just a little utility function // Just a little utility function
pub fn file<I: 'static + Hash + Copy + Send, T: ToString>( pub fn file<I: 'static + Hash + Copy + Send + Sync, T: ToString>(
id: I, id: I,
url: T, url: T,
) -> iced::Subscription<(I, Progress)> { ) -> iced::Subscription<(I, Progress)> {
subscription::run( subscription::unfold(id, State::Ready(url.to_string()), move |state| {
id, download(id, state)
Download { })
id,
url: url.to_string(),
},
download,
)
} }
#[derive(Debug, Hash, Clone)] #[derive(Debug, Hash, Clone)]
@ -25,74 +18,63 @@ pub struct Download<I> {
url: String, url: String,
} }
fn download<I: Copy>( async fn download<I: Copy>(
download: Download<I>, id: I,
) -> impl Stream<Item = (I, Progress)> { state: State,
let id = download.id; ) -> (Option<(I, Progress)>, State) {
match state {
State::Ready(url) => {
let response = reqwest::get(&url).await;
futures::stream::unfold( match response {
State::Ready(download.url), Ok(response) => {
move |state| async move { if let Some(total) = response.content_length() {
match state { (
State::Ready(url) => { Some((id, Progress::Started)),
let response = reqwest::get(&url).await;
match response {
Ok(response) => {
if let Some(total) = response.content_length() {
Some((
(id, Progress::Started),
State::Downloading {
response,
total,
downloaded: 0,
},
))
} else {
Some(((id, Progress::Errored), State::Finished))
}
}
Err(_) => {
Some(((id, Progress::Errored), State::Finished))
}
}
}
State::Downloading {
mut response,
total,
downloaded,
} => match response.chunk().await {
Ok(Some(chunk)) => {
let downloaded = downloaded + chunk.len() as u64;
let percentage =
(downloaded as f32 / total as f32) * 100.0;
Some((
(id, Progress::Advanced(percentage)),
State::Downloading { State::Downloading {
response, response,
total, total,
downloaded, downloaded: 0,
}, },
)) )
} else {
(Some((id, Progress::Errored)), State::Finished)
} }
Ok(None) => {
Some(((id, Progress::Finished), State::Finished))
}
Err(_) => Some(((id, Progress::Errored), State::Finished)),
},
State::Finished => {
// We do not let the stream die, as it would start a
// new download repeatedly if the user is not careful
// in case of errors.
let _: () = iced::futures::future::pending().await;
None
} }
Err(_) => (Some((id, Progress::Errored)), State::Finished),
} }
}
State::Downloading {
mut response,
total,
downloaded,
} => match response.chunk().await {
Ok(Some(chunk)) => {
let downloaded = downloaded + chunk.len() as u64;
let percentage = (downloaded as f32 / total as f32) * 100.0;
(
Some((id, Progress::Advanced(percentage))),
State::Downloading {
response,
total,
downloaded,
},
)
}
Ok(None) => (Some((id, Progress::Finished)), State::Finished),
Err(_) => (Some((id, Progress::Errored)), State::Finished),
}, },
) State::Finished => {
// We do not let the stream die, as it would start a
// new download repeatedly if the user is not careful
// in case of errors.
let _: () = iced::futures::future::pending().await;
unreachable!()
}
}
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]

View file

@ -60,8 +60,7 @@ where
{ {
Subscription::from_recipe(Runner { Subscription::from_recipe(Runner {
id: f, id: f,
initial: (), spawn: move |events| {
spawn: move |_, events| {
use futures::future; use futures::future;
use futures::stream::StreamExt; use futures::stream::StreamExt;
@ -73,31 +72,25 @@ where
} }
/// Returns a [`Subscription`] that will create and asynchronously run the /// Returns a [`Subscription`] that will create and asynchronously run the
/// [`Stream`] returned by the provided closure. /// given [`Stream`].
/// ///
/// The `initial` state will be used to uniquely identify the [`Subscription`]. /// The `id` will be used to uniquely identify the [`Subscription`].
pub fn run<I, T, S, Message>( pub fn run<I, S, Message>(id: I, stream: S) -> Subscription<Message>
id: I,
initial: T,
f: impl FnOnce(T) -> S + 'static,
) -> Subscription<Message>
where where
I: Hash + 'static, I: Hash + 'static,
T: 'static,
S: Stream<Item = Message> + Send + 'static, S: Stream<Item = Message> + Send + 'static,
Message: 'static, Message: 'static,
{ {
Subscription::from_recipe(Runner { Subscription::from_recipe(Runner {
id, id,
initial, spawn: move |_| stream,
spawn: move |initial, _| f(initial),
}) })
} }
/// Returns a [`Subscription`] that will create and asynchronously run a /// Returns a [`Subscription`] that will create and asynchronously run a
/// [`Stream`] that will call the provided closure to produce every `Message`. /// [`Stream`] that will call the provided closure to produce every `Message`.
/// ///
/// The `initial` state will be used to uniquely identify the [`Subscription`]. /// The `id` will be used to uniquely identify the [`Subscription`].
pub fn unfold<I, T, Fut, Message>( pub fn unfold<I, T, Fut, Message>(
id: I, id: I,
initial: T, initial: T,
@ -112,41 +105,39 @@ where
use futures::future::{self, FutureExt}; use futures::future::{self, FutureExt};
use futures::stream::StreamExt; use futures::stream::StreamExt;
run(id, initial, move |initial| { run(
id,
futures::stream::unfold(initial, move |state| f(state).map(Some)) futures::stream::unfold(initial, move |state| f(state).map(Some))
.filter_map(future::ready) .filter_map(future::ready),
}) )
} }
struct Runner<I, T, F, S, Message> struct Runner<I, F, S, Message>
where where
F: FnOnce(T, EventStream) -> S, F: FnOnce(EventStream) -> S,
S: Stream<Item = Message>, S: Stream<Item = Message>,
{ {
id: I, id: I,
initial: T,
spawn: F, spawn: F,
} }
impl<I, T, S, F, Message> Recipe<Hasher, (Event, event::Status)> impl<I, S, F, Message> Recipe<Hasher, (Event, event::Status)>
for Runner<I, T, F, S, Message> for Runner<I, F, S, Message>
where where
I: Hash + 'static, I: Hash + 'static,
T: 'static, F: FnOnce(EventStream) -> S,
F: FnOnce(T, EventStream) -> S,
S: Stream<Item = Message> + Send + 'static, S: Stream<Item = Message> + Send + 'static,
{ {
type Output = Message; type Output = Message;
fn hash(&self, state: &mut Hasher) { fn hash(&self, state: &mut Hasher) {
std::any::TypeId::of::<T>().hash(state); std::any::TypeId::of::<I>().hash(state);
self.id.hash(state); self.id.hash(state);
} }
fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> { fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
use futures::stream::StreamExt; use futures::stream::StreamExt;
(self.spawn)(self.initial, input).boxed() (self.spawn)(input).boxed()
} }
} }