Fix Subscription cancelation when never awaiting

`StreamExt::forward` will keep polling a ready `Stream` in a loop. If the
`Stream` is always ready, the `poll` method of `Forward` effectively
blocks (see https://github.com/rust-lang/futures-rs/issues/2552).

The fix consists in manually implementing a simpler version of `Forward`.
This commit is contained in:
Héctor Ramón Jiménez 2022-05-27 19:43:50 +02:00
parent 5de337f214
commit ecd00cf02b
No known key found for this signature in database
GPG key ID: 140CC052C94F138E

View file

@ -1,6 +1,9 @@
use crate::{BoxFuture, MaybeSend, Subscription}; use crate::{BoxFuture, MaybeSend, Subscription};
use futures::{channel::mpsc, sink::Sink}; use futures::{
channel::mpsc,
sink::{Sink, SinkExt},
};
use std::{collections::HashMap, marker::PhantomData}; use std::{collections::HashMap, marker::PhantomData};
/// A registry of subscription streams. /// A registry of subscription streams.
@ -64,7 +67,7 @@ where
+ MaybeSend + MaybeSend
+ Clone, + Clone,
{ {
use futures::{future::FutureExt, stream::StreamExt}; use futures::stream::StreamExt;
let mut futures: Vec<BoxFuture<()>> = Vec::new(); let mut futures: Vec<BoxFuture<()>> = Vec::new();
@ -85,19 +88,29 @@ where
continue; continue;
} }
let (cancel, cancelled) = futures::channel::oneshot::channel(); let (cancel, mut canceled) = futures::channel::oneshot::channel();
// TODO: Use bus if/when it supports async // TODO: Use bus if/when it supports async
let (event_sender, event_receiver) = let (event_sender, event_receiver) =
futures::channel::mpsc::channel(100); futures::channel::mpsc::channel(100);
let stream = recipe.stream(event_receiver.boxed()); let mut receiver = receiver.clone();
let mut stream = recipe.stream(event_receiver.boxed());
let future = futures::future::select( let future = async move {
cancelled, loop {
stream.map(Ok).forward(receiver.clone()), let select =
) futures::future::select(&mut canceled, stream.next());
.map(|_| ());
match select.await {
futures::future::Either::Left(_)
| futures::future::Either::Right((None, _)) => break,
futures::future::Either::Right((Some(message), _)) => {
let _ = receiver.send(message).await;
}
}
}
};
let _ = self.subscriptions.insert( let _ = self.subscriptions.insert(
id, id,