[Python3高阶编程] - [Python3高阶编程] - 异步编程深度学习指南三:手动实现AsyncRLock
一、手动实现AsyncRLockimport asyncio from typing import Optional class AsyncRLock: def __init__(self): self._lock asyncio.Lock() # 底层互斥锁 self._owner: Optional[asyncio.Task] None # 当前持有锁的协程Task self._count 0 # 可重入计数 async def acquire(self) - bool: current_task asyncio.current_task() if self._owner is current_task: # 同一协程重入计数1 self._count 1 return True else: # 不同协程需获取底层锁 await self._lock.acquire() self._owner current_task self._count 1 return True def release(self) - None: current_task asyncio.current_task() if self._owner is not current_task: raise RuntimeError(Cannot release un-acquired lock) self._count - 1 if self._count 0: self._owner None self._lock.release() def locked(self) - bool: return self._owner is not None def __enter__(self): raise RuntimeError( Use async with instead of with for AsyncRLock ) def __exit__(self, *args): pass async def __aenter__(self): await self.acquire() return self async def __aexit__(self, exc_type, exc_val, exc_tb): self.release()✅ 二、使用示例验证可重入性import asyncio async def inner(lock): async with lock: print(Inner acquired) return inner async def outer(lock): async with lock: print(Outer acquired) result await inner(lock) print(Back in outer) return result async def other(lock): async with lock: print(Other task acquired) async def main(): rlock AsyncRLock() # 测试可重入outer → inner result await outer(rlock) print(Result:, result) # 测试互斥另一个协程必须等待 task asyncio.create_task(other(rlock)) await asyncio.sleep(0.1) # 确保 other 尝试获取锁 print(Other task should be blocked...) await task # 此时才会执行 other asyncio.run(main())✅ 预期输出Outer acquired Inner acquired Back in outer Result: inner Other task should be blocked... Other task acquired⚠️ 三、关键注意事项与潜在问题1.必须绑定到asyncio.Task使用asyncio.current_task()获取当前协程身份而非线程 ID❌ 错误做法用id(asyncio.current_task())或其他标识Task 对象本身可哈希且唯一2.异常安全确保release()总是被调用必须通过__aexit__自动释放即只允许async with手动调用acquire/release容易漏掉release尤其在异常路径3.不要混用with和async with实现中__enter__主动报错防止误用同步上下文管理器4.不支持跨事件循环current_task()依赖当前 loop不能在多个 loop 间共享5.性能开销每次acquire都需检查current_task()但开销极小比asyncio.Lock略慢但远优于死锁6.不能用于await表达式直接返回设计为上下文管理器或显式 acquire/release不是 awaitable四、常见错误实现避坑指南❌ 错误 1用计数器但不绑定协程# 危险多个协程可能交错导致计数错误 class BadRLock: def __init__(self): self._count 0 self._lock asyncio.Lock() async def acquire(self): if self._count 0: self._count 1 # ❌ 无协程绑定多协程会混乱 else: await self._lock.acquire() self._count 1❌ 错误 2忘记异常安全async def bad_usage(lock): await lock.acquire() raise ValueError(Oops!) # release() 永远不会被调用 lock.release()✅正确始终用async with✅ 六、总结手动实现AsyncRLock的核心是用asyncio.Lock保证跨协程互斥用current_task()绑定持有者用计数器支持可重入通过__aexit__保证异常安全
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2473947.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!