use futures::Future; use futures::channel::mpsc; use futures::stream::{self, Stream, StreamExt}; pub fn channel(f: impl Fn(mpsc::Sender) -> F) -> impl Stream where F: Future, { let (sender, receiver) = mpsc::channel(1); stream::select( receiver, stream::once(f(sender)).filter_map(|_| async { None }), ) }