Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务最先运行(十)
Kotlin协程flow缓冲buffer任务流批次任务中选取优先级最高任务最先运行十在 https://blog.csdn.net/zhangphil/article/details/159286201 基础上改进简化LoadMgr提交简单任务的方法 。Kotlin协程Flow结合缓冲(buffer)实现优先级任务调度的改进方案。通过PriorityBlockingQueue存储任务并按优先级排序配合Channel和Flow构建生产者-消费者模型。新增submit()方法简化简单任务提交支持优先级设置和lambda表达式。核心流程包括1)任务入队到优先级队列2)通过Flow缓冲控制任务流速3)触发时取出最高优先级任务执行。实现了4线程池的并发处理并提供了任务取消和结果回调机制。改进后API更简洁支持优先级任务调度和流量控制。package lib import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.launch import kotlinx.coroutines.newFixedThreadPoolContext import java.util.concurrent.PriorityBlockingQueue class LoadMgr { companion object { private const val TAG fly/LoadMgr val INSTANCE LoadMgr() val THREAD_POOL newFixedThreadPoolContext(nThreads 4, name 线程) } private val mChannel ChannelLoadRequest() private val bufferCapacity 10 private val initialCapacity 50 private val mPriorityBlockingQueue PriorityBlockingQueue( initialCapacity, ComparatorLoadRequest { o1, o2 - o2.getPriority()!!.ordinal - o1.getPriority()!!.ordinal }) private constructor() { println($TAG constructor) } fun startup() { //接收任务 CoroutineScope(THREAD_POOL).launch { println($TAG Channel start... ${Thread.currentThread().name}) mChannel.receiveAsFlow() .onEach { it - //生产者 //println($TAG onEach-$it ${Thread.currentThread().name}) }.buffer(bufferCapacity) .collect { it - //消费者 //collect, 这里相当于通过缓冲后匀速发射过来的触发器(trigger)。 //收集到的值在此并不重要这里只是把它作为触发信号。 //println($TAG collect-$it ${Thread.currentThread().name}) trigger() } } } private fun trigger() { val loadRequest mPriorityBlockingQueue.poll() println($TAG 当前最大优先级任务:${loadRequest} ${Thread.currentThread().name}) loadRequest?.let { CoroutineScope(THREAD_POOL).launch { val result if (it.isCancelled()) { println($TAG id${loadRequest.getId()} isCancelled${it.isCancelled()}) returnlaunch } else { it.getListener()?.onStart(it) it.getLoader()?.doInBackground() } println($TAG id${loadRequest} doInBackground完成 isCancelled${loadRequest.isCancelled()} ${Thread.currentThread().name}) if (it.isCancelled()) { // do noting } else { it.getListener()?.onSuccess(it, result) println($TAG deliveryResult loadRequest${loadRequest} ${Thread.currentThread().name}) it.getLoader()?.deliveryResult(result) } } } } fun enqueue(taskInfo: LoadRequest) { CoroutineScope(THREAD_POOL).launch { mPriorityBlockingQueue.add(taskInfo) mChannel.send(taskInfo) } } fun submit(priority: Priority Priority.NORMAL, loader: Loader): LoadRequest { val request LoadRequest.Builder() .priority(priority) .loader(loader) .build() enqueue(request) return request } fun submit(priority: Priority Priority.NORMAL, func: () - Unit): LoadRequest { val loader object : SimpleLoader() { override fun worker() { func.invoke() } } val request LoadRequest.Builder() .priority(priority) .loader(loader) .build() enqueue(request) return request } fun destroy() { mPriorityBlockingQueue.clear() mChannel.cancel() mChannel.close() } }
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2448756.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!