asyncio
- 协程简单来说就是一个死循环:在内部如果遇到等待操作,就会切换到别的代码去执行,当这个等待操作结束了,再继续回来执行。
- 并且协程的创建和销毁开销没有进程和线程的大。
事件循环
- 事件循环是每个 asyncio 应用程序的核心。事件循环运行异步任务和回调,执行网络 IO 操作,并运行子进程。
- 在新版本中可以直接使用 asyncio.run() 来直接执行一个协程对象。
- 如果
asyncio.run()
的debug 为True
,事件循环将以调试模式运行。
- 如果
asyncio.get_running_loop
( ) :获取当前正在运行的事件循环,如果没有,抛出异常。asyncio.set_event_loop
(loop):将当前循环设置为指定的事件循环。asyncio.new_event_loop
( ) :创建一个新的事件循环
loop.run_until_complete
(future) :运行直到 future(协程对象) 执行完毕(获取返回值):获取当前事件循环,3.10 版本 开始这个函数将是loop.get_event_loop
()get_running_loop()
的别名loop.run_forever
( ):运行直到 stop 被调用。loop.stop
( ):停止事件循环。loop.close
( ) :关闭事件循环,所有相关回调,循环全部会被关闭。- 如果使用的是
asyncio.run()
:进行执行,则不需要手动关闭。
- 如果使用的是
loop.is_running
( ) :判断事件循环是否在运行。loop.is_closed
( ) :判断事件循环是否关闭。
创建任务
loop.create_future
( ):创建一个附加到事件循环的 future 对象- 官方推荐:这是在 asyncio 中创建 Futures 的首选方式。
- 推荐直接使用下面的
create_task
它是对future
的封装
- 推荐直接使用下面的
- 通常情况下 没有必要 在应用层级的代码中创建 Future 对象。
- future实际上是一个容器,用于接收 task 的结果。
- 当一个 Future 对象 _被等待_,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。
- 官方推荐:这是在 asyncio 中创建 Futures 的首选方式。
loop.create_task
( coro , *** , *name = None* ):安排协程的执行,返回一个 Task 对象。- 如果指定了 name,那么返回值将以 name 为准。
- 返回一个 Task 对象。当 Task 对象被等待,协程将保持等待(期间会切换到其他代码中执行)直到返回结果。
执行任务
asyncio.wait_for
(aw, timeout):等待一个协程对象执行,并且在timeout秒后抛出超时错误。(等待单个协程对象)- 如果timeout为
None
:表示等待直到运行完毕。
- 如果timeout为
asyncio.gather
(aws, return_exceptions=False):并发执行任务列表,获取的返回值和函数执行顺序一样。并发 运行 aws 序列中的 可等待对象。
如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度。
如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。
如果 return_exceptions 为
True
,异常会和成功的结果一样处理,并聚合至结果列表。如果
gather()
_被取消_,所有被提交 (尚未完成) 的可等待对象也会 _被取消_。当用
loop.create_task()
:创建一个任务列表交给 gather 的时候,要用 * 解包进行执行。
asyncio.wait
(aws, ***, timeout=None, return_when=ALL_COMPLETED):并发运行阻塞等待直到return_when
条件成立(等待多个协程对象)- 返回两个 Task/Future 集合:
(done, pending)
。:done, pending = await asyncio.wait({aws})
- 相同:wait 和 gather 都是获取执行结果,但 wait 不会将 task 封装为 future,而 gatther 会。
- 所以gatther可以直接看到结果,而 wait 需要调用 result()
- 不同:
asyncio.wait
会返回两个值:done
和pending
,done
为已完成的协程Task
,pending
为超时未完成的协程Task
,需通过future.result
调用Task
的result
;- 而
asyncio.gather
返回的是所有已完成Task
的result
,不需要再进行调用或其他操作,就可以得到全部结果。
- 而
- 从 Python 3.8 开始,不推荐使用 asyncio.wait() 将协程对象显式传递,并计划在 Python 3.11 中删除。
常量 描述 FIRST_COMPLETED
函数将在任意可等待对象结束或取消时返回。 FIRST_EXCEPTION
函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED
。ALL_COMPLETED
函数将在所有可等待对象结束或取消时返回。 - 返回两个 Task/Future 集合:
asyncio.as_completed
(aws, ***, timeout=None):并发地运行可迭代对象中的 可等待对象。 返回一个协程的迭代器。所返回的迭代器中,所获取到的下一个,总是剩余时间最少的。
import asyncio import random async def sl(): sleep = random.random() await asyncio.sleep(sleep) # 返回随机值 return sleep async def main(): # 创建一个任务列表 task_list = [sl() for v in range(5)] # 每次都返回剩余时间最少的 for coro in asyncio.as_completed(task_list): print(await coro) if __name__ == '__main__': asyncio.run(main())
asyncio.run_coroutine_threadsafe
(coro, loop):向指定事件循环提交一个协程向指定事件循环提交一个协程。(线程安全)
返回一个
concurrent.futures.Future
以等待来自其他 OS 线程的结果。此函数应该从另一个线程中调用,而非事件循环运行所在线程。
# 如果内部产生了异常,将会通知future future = asyncio.run_coroutine_threadsafe(coro, loop) assert future.result(timeout) == 3
asyncio.to_thread
(func, /, args, **kwargs):在线程中运行函数(在协程内执行一个不支持协程的函数)3.9 版本功能
task
- 更加推荐使用
create_task
它是对下面Future
的封装 支持更多方法 - 一个与
Future 类似
的对象,可运行 Python 协程。非线程安全。 - Task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future 对象,Task 对象会挂起该协程的执行并等待该 Future 对象完成。当该 Future 对象 _完成_,被挂起的协程将恢复执行。
- 事件循环使用协同日程调度: 一个事件循环每次运行一个 Task 对象。而一个 Task 对象会等待一个 Future 对象完成,该事件循环会运行其他 Task、回调或执行 IO 操作。
asyncio.current_task
(loop=None):返回当前运行的Task
实例,如果没有正在运行的任务则返回None
。asyncio.all_tasks
(loop=None):返回事件循环所运行的未完成的Task
对象的集合。- 如果 loop 为
None
则会使用get_running_loop()
获取当前事件循环。
- 如果 loop 为
task.cancel
(msg=None):请求取消 task 对象,这将安排在下一轮事件循环中抛出一个CancelledError
异常给被封包的协程。task.cancelled
():如果 Task 对象 被取消 则返回True
。task.done
():如果 Task 对象已完成,则返回 True。task.result
():返回 Task 的结果。如果 Task 对象 _已完成_,其封包的协程的结果会被返回 (或者当协程引发异常时,该异常会被重新引发。)
如果 Task 对象 _被取消_,此方法会引发一个
CancelledError
异常。如果 Task 对象的结果还不可用,此方法会引发一个
InvalidStateError
异常。
task.exception
():返回 Task 对象的异常。- 如果所封包的协程引发了一个异常,该异常将被返回。如果所封包的协程正常返回则该方法将返回
None
。
- 如果所封包的协程引发了一个异常,该异常将被返回。如果所封包的协程正常返回则该方法将返回
task.add_done_callback
(callback, ***, *context=None*):添加一个回调,在 Task 执行完毕之后运行。- 可以用 functools.partial() 给回调函数传递参数,例如:
# 当函数完成时,调用打印,传入 Future: fut.add_done_callback(functools.partial(print, "Future:"))
task.remove_done_callback
(callback):从回调列表中删除回调。task.get_name
():返回 task 的名字。task.set_name
(value):设置 Task 名字
Future
- 一个 Future 代表一个异步运算的最终结果。
- Future 是一个 awaitable 对象。协程可以等待 Future 对象直到它们有结果或异常集合或被取消。
- 通常 Future 用于支持底层回调式代码(例如在协议实现中使用 asyncio transports) 与高层异步/等待式代码交互。
future.result()
:返回被设置的future结果future.set_result
(result):将 Future 标记为 完成 并设置结果。如果 Future 已经 完成 则抛出一个InvalidStateError
错误。future.set_exception
(exception):将 Future 标记为 完成 并设置一个异常。如果 Future 已经 完成 则抛出一个InvalidStateError
错误。future.done
()如果 Future 为已 完成 则返回True
。- 如果 Future 为 取消 或调用
set_result()
设置了结果或调用set_exception()
设置了异常,那么它就是 完成 。
- 如果 Future 为 取消 或调用
cancelled
():如果 Future 已 取消 则返回True
:这个方法通常在设置结果或异常前用来检查 Future 是否已 取消 。add_done_callback
(callback, ***, context=None):添加一个在 Future *完成* 时运行的回调函数。- 可以用 functools.partial() 给回调函数传递参数,例如:
# 当函数完成时,调用打印,传入 Future: fut.add_done_callback(functools.partial(print, "Future:"))
remove_done_callback
(callback):从回调列表中移除 callback 。返回被移除的回调函数的数量,通常为 1,除非一个回调函数被添加多次。cancel
(msg=None):取消 Future 并调度回调函数。如果 Future 已经 完成 或 取消 ,返回False
。- 否则将 Future 状态改为 取消 并在调度回调函数后返回
True
。
- 否则将 Future 状态改为 取消 并在调度回调函数后返回
exception
():返回 Future 已设置的异常。只有 Future 在 完成 时才返回异常(或者
None
,如果没有设置异常)。如果 Future 已 _取消_,方法会引发一个
CancelledError
异常。如果 Future 还没 完成 ,这个方法会引发一个
InvalidStateError
异常。get_loop
():返回 Future 对象已绑定的事件循环。
import asyncio
# 一个接收返回值的小栗子
async def func(fut):
# 等待两秒
await asyncio.sleep(2)
# 将传入的fut对象设置一个值
fut.set_result('这里返回一个结果!')
async def start():
# 这里获取当前事件循环 (asyncio.run外部创建的)
loop = asyncio.get_running_loop()
# 通过当前事件循环,创建一个fut任务,并且加入循环队列
fut = loop.create_future()
# 在事件循环中加入一个任务,并且将上面创建fut传递给任务
loop.create_task(func(fut))
# 一直等待上面的fut对象的结果,获取到结果会立即结束
# 如果没有任务修改fut,则会一直等待下去
result = await fut
print(result)
asyncio.run(start())
在协程内执行同步函数
- 事与愿违 有的时候必须调用同步函数 咋办?
- 比如我们找到了一个库项目必须使用 但这个库就只有同步方法.
- 可以通过创建一个进程池 将任务放入池内执行 这个过程不需要我们实现
import asyncio
from concurrent.futures import ProcessPoolExecutor # ThreadPoolExecutor 线程池
process_executor = ProcessPoolExecutor(20) # 创建20个进程的池
loop = asyncio.get_running_loop() # 获取事件循环 通过进程池来执行函数 防止阻塞主进程
await loop.run_in_executor(process_executor, sleep) # 用run_in_executor将一个任务放入池内执行 不会影响主进程的执行
def run_in_executor(self, executor, func, *args):
第三个是*args
会根据位置 传递给函数- 如果没有特殊需求可以使用
asyncio.to_thread()
线程安全
- asyncio 原语不是线程安全的,因此它们不应被用于 OS 线程同步 (而应当使用
threading
); - 这些同步原语的方法不接受 timeout 参数;请使用
asyncio.wait_for()
函数来执行带有超时的操作。
- 要理解线程安全就先理解什么是线程不安全:
- 当我们对一个变量用异步的方式每次递增,增加两百万次之后,可以发现每次的结果都是不一样的。
- 因为对变量进行增加的操作不是原子操作(原子操作:在操作期间不会被切换到其他线程中进行执行。)
- 这就可能导致一些意想不到的数据丢失问题,那么解决这个问题就要用到锁。
- 但用到锁必定会导致操作耗时的增加,另外一个线程可能就等着这个线程操作结束才会继续。
lock = asyncio.Lock()
:async with lock
:实现一个用于 asyncio 任务的互斥锁。 非线程安全。用来保证对共享资源的独占访问。lock.acquire
():获取锁,等到直到锁为 unlocked 并将其设为 True,锁的获取是 公平的: 被执行的协程将是最开始等待锁的协程。lock.release
():释放锁。lock.locked
():如果锁为 locked 则返回True
。
Event
asyncio.Event
:asyncio 事件可被用来通知多个 asyncio 任务已经有事件发生。(继续执行)event.wait
():等待直到事件被设置为 True 后继续执行event.set
():设置事件,事件将被设置为 Trueevent.clear
():清空事件,事件将被设置为 Falseevent.is_set
():如果事件已被设置返回 True
队列:先进先出
queue = asyncio.Queue (_maxsize=0_)
先进,先出(FIFO)队列:
如果 maxsize 小于等于零,则队列尺寸是无限的。如果是大于
0
的整数,则当队列达到 maxsize 时,await put()
将阻塞至某个元素被get()
取出。maxsize
:队列中可存放的元素数量。empty
():如果队列为空返回True
,否则返回False
。full
():如果有maxsize
个条目在队列中,则返回True
。- 如果队列用
maxsize=0
(默认)初始化,则full()
永远不会返回True
。
- 如果队列用
get
():从队列中删除并返回一个元素。如果队列为空,则等待,直到队列中有元素。get_nowait
():立即返回一个队列中的元素,如果队列内有值,否则引发异常QueueEmpty
。join
():阻塞至队列中所有的元素都被接收和处理完毕。- 当条目添加到队列的时候,未完成任务的计数就会增加。每当消费协程调用
task_done()
表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候,join()
阻塞被解除。
- 当条目添加到队列的时候,未完成任务的计数就会增加。每当消费协程调用
put
(item):添加一个元素进队列。如果队列满了,在添加元素之前,会一直等待空闲插槽可用。put_nowait
(item):不阻塞的放一个元素入队列。如果没有立即可用的空闲槽,引发QueueFull
异常。qsize
():返回队列用的元素数量。task_done
():表明前面排队的任务已经完成,即 get 出来的元素相关操作已经完成。- 由队列使用者控制。每个
get()
用于获取一个任务,任务最后调用task_done()
告诉队列,这个任务已经完成。 - 如果
join()
当前正在阻塞,在所有条目都被处理后,将解除阻塞(意味着每个put()
进队列的条目的task_done()
都被收到)。 - 如果被调用的次数多于放入队列中的项目数量,将引发
ValueError
。
- 由队列使用者控制。每个
队列变种:先进后出
lefoquery = asyncio.LifoQueue
Queue
的变体,先取出最近添加的条目(后进,先出)。
等待
coro = asyncio.sleep(1, result=3)
:当前面的延迟结束之后,将会返回一个 result 的值。
保护任务
asyncio.shield
(aw):保护一个可等待对象不被取消。
协程池
- 原生的官方包不支持协程池
import asyncio
class CoroutinePool:
def __init__(self, max_works):
self.max_works = max_works
self.tasks = []
self.task_queue = asyncio.Queue()
def submit_task(self, task, *args, **kwargs):
self.task_queue.put_nowait([task, args, kwargs])
async def __aenter__(self):
for i in range(self.max_works):
asyncio.create_task(self._task_handle())
self.tasks.append(asyncio.create_task(self._task_handle()))
return self
async def __aexit__(self, exc_type, exc, tb):
await self.task_queue.join()
for task in self.tasks:
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)
async def _task_handle(self):
while True:
task, args, kwargs = await self.task_queue.get()
await task(*args, **kwargs)
self.task_queue.task_done()
async def f(i):
await asyncio.sleep(1)
print(i, asyncio.current_task().get_name())
async def main():
async with CoroutinePool(5) as pool:
for i in range(10):
pool.submit_task(f, i)
if __name__ == "__main__":
asyncio.run(main())