分析Python条件变量如何暂停和唤醒线程
在开始前先了解一下基本使用ConditionCondition本质上是一个带有等待队列的锁它封装了一个底层锁Lock或RLock一个等待者队列_waiters核心方法方法作用wait(timeout)释放锁并阻塞等待直到被notify()唤醒或超时notify(n1)唤醒等待队列中的 n 个线程notify_all()唤醒所有等待的线程acquire()/release()继承自底层锁的获取/释放Condition 的基本使用模式importthreading conditionthreading.Condition()# 线程A等待某个条件defconsumer():withcondition:# 获取锁whilenotsome_condition:# 检查条件condition.wait()# 释放锁等待通知# 条件满足处理数据print(条件满足开始消费)# 线程B满足条件后通知defproducer():withcondition:some_conditionTruecondition.notify()# 通知一个等待者# 或 condition.notify_all() 通知所有等待者这里会产生一个疑惑是什么魔法使得线程暂停和恢复呢线程暂停和恢复的原理wait() 时暂停的是什么以下为源码中的一部分defwait(self,timeoutNone):# 1. 创建一个等待者锁这是普通的 Lock不是 RLockwaiter_allocate_lock()waiter.acquire()# ← 获取这个锁刚创建所以立即成功# 2. 把这个 waiter 添加到等待队列self._waiters.append(waiter)# 3. 释放底层的 Condition 锁允许其他线程进入临界区saved_stateself._release_save()gotitFalsetry:# 4. 关键在这里阻塞等待被唤醒iftimeoutisNone:waiter.acquire()# ← 暂停在这里gotitTrue暂停的是waiter.acquire()这个调用。线程在这个调用上阻塞等待有人释放这个 waiter 锁。解释一下流程new了一个新的锁当前线程acquire了这个新的锁将锁加入到公共管理队列中这个时候锁就是代表线程实例释放了条件变量的公共锁这样其他线程可以使用这个公共锁重要又一次acquire这个新的锁这个时候线程会暂停相当于自己持有了锁又申请了一遍这样实现了线程的暂停notify() 是如何唤醒的defnotify(self,n1):waitersself._waiterswhilewaitersandn0:waiterwaiters[0]try:waiter.release()# ← 释放waiter锁唤醒等待的线程exceptRuntimeError:pass解释一下流程其他线程拥有条件变量之前条件变量的公共锁已释放即可以acquire进入。notify的时候操作公共管理队列可以获取其他锁其他线程自己持有锁被第二次持有暂停了调用锁的release这个地方有点反正常使用常规都是2个线程共有1把锁实现串行执行这样之前锁自己线程的就可以恢复了。AI总结核心机制图解┌─────────────────────────────────────────────────────────────────┐ │ Condition 内部结构 │ ├─────────────────────────────────────────────────────────────────┤ │ _lock: 底层锁 (threading.Lock) │ │ _waiters: 等待者队列 [waiter1, waiter2, waiter3, ...] │ └─────────────────────────────────────────────────────────────────┘wait() 时的操作流程# 线程A调用 wait() 时1.waiter_allocate_lock()# 创建新锁初始状态未锁定2.waiter.acquire()# 获取waiter锁成功3._waiters.append(waiter)# 加入等待队列4._lock.release()# 释放底层Condition锁5.waiter.acquire()# ← 在这里阻塞等待别人释放waiternotify() 时的操作流程# 线程B调用 notify() 时1.waiter_waiters[0]# 取出队首的waiter2.waiter.release()# 释放waiter锁# → 线程A的 waiter.acquire() 成功返回# → 线程A被唤醒关键点总结步骤wait()notify()1创建独立的waiter锁取出队列中的waiter2waiter.acquire()成功waiter.release()释放3加入_waiters队列唤醒等待在该waiter上的线程4释放底层_lock-5在waiter.acquire()阻塞-本质上Condition 使用了一个代理锁的机制。每个等待的线程都创建一个自己的waiter锁调用wait()时自己持有这个锁然后释放底层的 Condition 锁。当notify()时只是释放这个waiter锁线程就自然被唤醒了。引申Semaphore信号量这个类是将Condition条件变量与一个value绑定算是Condition是使用案例了classSemaphore:This class implements semaphore objects. Semaphores manage a counter representing the number of release() calls minus the number of acquire() calls, plus an initial value. The acquire() method blocks if necessary until it can return without making the counter negative. If not given, value defaults to 1. # After Tim Peters semaphore class, but not quite the same (no maximum)def__init__(self,value1):ifvalue0:raiseValueError(semaphore initial value must be 0)self._condCondition(Lock())self._valuevaluedef__repr__(self):clsself.__class__return(f{cls.__module__}.{cls.__qualname__}at{id(self):#x}:f value{self._value})defacquire(self,blockingTrue,timeoutNone):Acquire a semaphore, decrementing the internal counter by one. When invoked without arguments: if the internal counter is larger than zero on entry, decrement it by one and return immediately. If it is zero on entry, block, waiting until some other thread has called release() to make it larger than zero. This is done with proper interlocking so that if multiple acquire() calls are blocked, release() will wake exactly one of them up. The implementation may pick one at random, so the order in which blocked threads are awakened should not be relied on. There is no return value in this case. When invoked with blocking set to true, do the same thing as when called without arguments, and return true. When invoked with blocking set to false, do not block. If a call without an argument would block, return false immediately; otherwise, do the same thing as when called without arguments, and return true. When invoked with a timeout other than None, it will block for at most timeout seconds. If acquire does not complete successfully in that interval, return false. Return true otherwise. ifnotblockingandtimeoutisnotNone:raiseValueError(cant specify timeout for non-blocking acquire)rcFalseendtimeNonewithself._cond:whileself._value0:ifnotblocking:breakiftimeoutisnotNone:ifendtimeisNone:endtime_time()timeoutelse:timeoutendtime-_time()iftimeout0:breakself._cond.wait(timeout)else:self._value-1rcTruereturnrc __enter__acquiredefrelease(self,n1):Release a semaphore, incrementing the internal counter by one or more. When the counter is zero on entry and another thread is waiting for it to become larger than zero again, wake up that thread. ifn1:raiseValueError(n must be one or more)withself._cond:self._valuenforiinrange(n):self._cond.notify()def__exit__(self,t,v,tb):self.release()
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2500134.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!