【Iced】stream.rs文件
usestd::future::Future;usestd::pin::Pin;usestd::task::{Context,Poll};usefutures::stream::Stream;usecrate::subscription::EventStream;/// 将Stream转换为EventStreampubfnfrom_streamMessage,S(stream:S)-EventStreamMessagewhereMessage:staticSend,S:StreamItemMessageSendstatic,{EventStream::from(Box::pin(stream))}/// 将Future转换为EventStreampubfnfrom_futureMessage,F(future:F)-EventStreamMessagewhereMessage:staticSend,F:FutureOutputMessageSendstatic,{from_stream(futures::stream::once(future))}/// 将多个EventStream合并为一个pubfnmergeMessage(streams:VecEventStreamMessage)-EventStreamMessagewhereMessage:staticSend,{ifstreams.is_empty(){returnEventStream::from(Box::pin(futures::stream::empty()));}letmutmergedstreams.into_iter().map(Into::into).collect::Vec_();EventStream::from(Box::pin(futures::stream::select_all(merged)))}/// 创建一个空的EventStreampubfnemptyMessage()-EventStreamMessagewhereMessage:staticSend,{EventStream::from(Box::pin(futures::stream::empty()))}/// 创建一个只包含单个消息的EventStreampubfnonceMessage(message:Message)-EventStreamMessagewhereMessage:staticSend,{from_future(async{message})}/// 创建一个周期触发的EventStreampubfnintervalMessage,F(duration:std::time::Duration,f:F)-EventStreamMessagewhereMessage:staticSend,F:Fn()-MessageSendSyncstatic,{letstreamfutures::stream::unfold((),move|_|asyncmove{tokio::time::sleep(duration).await;Some((f(),()))});EventStream::from(Box::pin(stream))}/// 将Iterator转换为EventStreampubfnfrom_iterMessage,I(iter:I)-EventStreamMessagewhereMessage:staticSend,I:IntoIteratorItemMessageSendstatic,I::IntoIter:Send,{letstreamfutures::stream::iter(iter);EventStream::from(Box::pin(stream))}/// 创建一个可以动态发送消息的EventStreampubfnchannelMessage(buffer:usize)-(mpsc::SenderMessage,EventStreamMessage)whereMessage:staticSend,{let(sender,receiver)mpsc::channel(buffer);(sender,EventStream::from(Box::pin(receiver)))}#[cfg(test)]modtests{usesuper::*;usefutures::StreamExt;#[tokio::test]asyncfntest_from_stream(){letstreamfutures::stream::iter(vec![1,2,3]);letmutevent_streamfrom_stream(stream);assert_eq!(event_stream.next().await,Some(1));assert_eq!(event_stream.next().await,Some(2));assert_eq!(event_stream.next().await,Some(3));assert_eq!(event_stream.next().await,None);}#[tokio::test]asyncfntest_from_future(){letfutureasync{42};letmutevent_streamfrom_future(future);assert_eq!(event_stream.next().await,Some(42));assert_eq!(event_stream.next().await,None);}#[tokio::test]asyncfntest_merge(){letstream1from_iter(vec![1,2,3]);letstream2from_iter(vec![4,5,6]);letstream3from_iter(vec![7,8,9]);letmergedmerge(vec![stream1,stream2,stream3]);letresults:Veci32merged.collect().await;// select_all会随机选择所以只验证长度和包含的元素assert_eq!(results.len(),9);foriin1..9{assert!(results.contains(i));}}#[tokio::test]asyncfntest_empty(){letmutempty_stream:EventStreami32empty();assert_eq!(empty_stream.next().await,None);}#[tokio::test]asyncfntest_once(){letmutonce_streamonce(42);assert_eq!(once_stream.next().await,Some(42));assert_eq!(once_stream.next().await,None);}#[tokio::test]asyncfntest_interval(){usestd::sync::atomic::{AtomicUsize,Ordering};usestd::sync::Arc;usetokio::time::timeout;letcounterArc::new(AtomicUsize::new(0));letcounter_clonecounter.clone();letinterval_streaminterval(std::time::Duration::from_millis(10),move||{counter_clone.fetch_add(1,Ordering::SeqCst);42});// 只取前3个值避免无限等待letmutlimited_streaminterval_stream.take(3);letresults:Veci32limited_stream.collect().await;assert_eq!(results,vec![42,42,42]);assert_eq!(counter.load(Ordering::SeqCst),3);}#[tokio::test]asyncfntest_from_iter(){letvecvec![1,2,3,4,5];letmutiter_streamfrom_iter(vec);foriin1..5{assert_eq!(iter_stream.next().await,Some(i));}assert_eq!(iter_stream.next().await,None);}#[tokio::test]asyncfntest_channel(){let(sender,mutevent_stream)channel(10);// 发送消息sender.send(1).await.unwrap();sender.send(2).await.unwrap();sender.send(3).await.unwrap();// 由于channel是异步的需要稍微等待一下tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;// 丢弃sender关闭通道drop(sender);assert_eq!(event_stream.next().await,Some(1));assert_eq!(event_stream.next().await,Some(2));assert_eq!(event_stream.next().await,Some(3));assert_eq!(event_stream.next().await,None);}}文件说明这是一个用于创建和管理EventStream的工具函数集合。EventStream似乎是某个框架可能是 Iced GUI 框架中用于处理异步事件流的核心类型。主要函数函数描述使用场景from_stream将普通Stream转换为EventStream适配外部Stream到框架from_future将Future转换为单元素EventStream处理一次性异步操作merge合并多个EventStream组合多个事件源empty创建空EventStream默认值或条件分支once创建包含单个消息的Stream立即触发的事件interval创建周期性触发的Stream定时器、心跳、轮询from_iter将Iterator转换为EventStream处理静态数据集合channel创建可动态发送消息的Stream复杂事件生产场景核心模式所有函数都遵循一个统一模式接收各种数据源Stream、Future、Iterator、定时器等统一包装为EventStream类型。这提供了统一接口所有事件源都表现为同一种类型组合能力可以轻松合并、转换事件流灵活性支持多种事件生产方式应用场景这个模块通常用于GUI应用处理用户输入、系统事件实时系统处理数据流、定时任务异步编程统一处理各种异步数据源事件驱动架构作为事件总线的基础通过提供这些工具函数开发者可以专注于业务逻辑而不必关心不同类型事件流的底层实现细节。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2414245.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!