[Python3高阶编程] - 异步编程深度学习指南一: 基础知识( 源代码)
异步编程深度学习指南 原文: https://blog.csdn.net/andylin02/article/details/159649164?spm1001.2014.3001.5502#!/home/admin/.pyenv/versions/3.9.12/bin/python # -*- coding: utf-8 -*-o import aiohttp import asyncio from asyncio import Semaphore async def fetch_sample(session, url): print(url) return xx async def fetch(session, url): async with session.get(url) as resp: return await resp.text() async def test_fetch(client_index): # loop asyncio.get_running_loop() # print(floop: {loop}) async with aiohttp.ClientSession() as session: result await fetch(session, https://baidu.com) # loop asyncio.get_running_loop() # print(floop: {loop}) print(f[{client_index}] result: {result[:100]}) return fclient: {client_index} # test run sample asyncio.run(test_fetch(1)) # async 定义协程 async def greet(): print(Hello) await asyncio.sleep(1) # 非阻塞等待 print(World) return 1 # 运行协程 # asyncio.run(greet()) async def test_call_greet(): ret await greet() print(fend of call greet: {ret}) asyncio.run(test_call_greet()) async def task(name, delay): print(f{name} 开始) await asyncio.sleep(delay) print(f{name} 完成) return name async def test_by_create_task(): # 创建任务 t1 asyncio.create_task(task(任务1, 2)) t2 asyncio.create_task(task(任务2, 1)) t3 asyncio.create_task(task(任务3, 3)) # loop asyncio.get_running_loop() # print(fmain- loop: {loop}) # 等待所有任务完成 # results await asyncio.gather(t1, t2, t3) # print(f结果: {results}) # 和上面的代码 功能是一致的 await t1 await t2 await t3 # 总耗时约 3 秒不是 6 秒 async def test_get_by_gather(): # t11 asyncio.create_task(test_fetch(1)) # t12 asyncio.create_task(test_fetch(2)) # t13 asyncio.create_task(test_fetch(3)) # t14 asyncio.create_task(test_fetch(4)) # ls_ret await asyncio.gather(t11, t12, t13, t14) # print(f结果: {ls_ret}) ls_ret await asyncio.gather( test_fetch(1), test_fetch(2), test_fetch(3), test_fetch(4) ) print(f结果: {ls_ret}) class TestSemaphoreGet: M_SEMLOCK None # Semaphore(10) # 最多10个并发 classmethod def GlobalInit(cls): cls.M_SEMLOCK Semaphore(10) async def call_fetch_with_max_concurrent(self, index): async with TestSemaphoreGet.M_SEMLOCK: return await test_fetch(index) def blocking_io(): with open(/tmp/test_file.txt, r) as fp: return fp.read() async def test_call_blocking_io(): loop asyncio.get_running_loop() data await loop.run_in_executor(None, blocking_io) # print(data) return data async def main(): # test by test_by_create_task await test_by_create_task() # test by gather await test_get_by_gather() # test for semaphore TestSemaphoreGet.GlobalInit() ls_task [ TestSemaphoreGet().call_fetch_with_max_concurrent(indexindex) for index in range(0, 100) ] ls_ret await asyncio.gather(*ls_task) print(f结果: {ls_ret}) # test for blocking io ret await test_call_blocking_io() print(fret: {ret}) # loop asyncio.get_running_loop() # print(floop: {loop}) # test for call blocking io with open(/tmp/test_file.txt, w) as fp: fp.write(hello, test!) asyncio.run(main())
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2467355.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!