嘘~ 正在从服务器偷取页面 . . .

Asyncio


asyncio

  • 协程简单来说就是一个死循环在内部如果遇到等待操作,就会切换到别的代码去执行,当这个等待操作结束了,再继续回来执行。
  • 并且协程的创建和销毁开销没有进程和线程的大。

事件循环

  1. 事件循环是每个 asyncio 应用程序的核心。事件循环运行异步任务和回调,执行网络 IO 操作,并运行子进程。
  2. 在新版本中可以直接使用 asyncio.run() 来直接执行一个协程对象。
    • 如果 asyncio.run()debugTrue,事件循环将以调试模式运行。
  • asyncio.get_running_loop( ) :获取当前正在运行的事件循环,如果没有,抛出异常。
  • asyncio.set_event_loop(loop):将当前循环设置为指定的事件循环。
  • asyncio.new_event_loop( ) :创建一个新的事件循环

  • loop.run_until_complete(future) :运行直到 future(协程对象) 执行完毕(获取返回值)
  • loop.get_event_loop():获取当前事件循环,3.10 版本 开始这个函数将是 get_running_loop()的别名
  • loop.run_forever( ):运行直到 stop 被调用。
  • loop.stop( ):停止事件循环。
  • loop.close( ) :关闭事件循环,所有相关回调,循环全部会被关闭。
    • 如果使用的是 asyncio.run():进行执行,则不需要手动关闭。
  • loop.is_running( ) :判断事件循环是否在运行。
  • loop.is_closed( ) :判断事件循环是否关闭。

创建任务

  • loop.create_future( ):创建一个附加到事件循环的 future 对象
    • 官方推荐:这是在 asyncio 中创建 Futures 的首选方式。
      • 推荐直接使用下面的 create_task 它是对 future的封装
    • 通常情况下 没有必要 在应用层级的代码中创建 Future 对象。
    • future实际上是一个容器,用于接收 task 的结果。
    • 当一个 Future 对象 _被等待_,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。
  • loop.create_task( coro , *** , *name = None* ):安排协程的执行,返回一个 Task 对象。
    • 如果指定了 name,那么返回值将以 name 为准。
    • 返回一个 Task 对象。当 Task 对象被等待,协程将保持等待(期间会切换到其他代码中执行)直到返回结果。

执行任务

  • asyncio.wait_for(aw, timeout):等待一个协程对象执行,并且在timeout秒后抛出超时错误。(等待单个协程对象

    • 如果timeoutNone:表示等待直到运行完毕。
  • asyncio.gather(aws, return_exceptions=False):并发执行任务列表,获取的返回值和函数执行顺序一样。

    • 并发 运行 aws 序列中的 可等待对象

      如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度。

      如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。

      如果 return_exceptionsTrue,异常会和成功的结果一样处理,并聚合至结果列表。

      如果 gather() _被取消_,所有被提交 (尚未完成) 的可等待对象也会 _被取消_。

    • 当用loop.create_task():创建一个任务列表交给 gather 的时候,要用 * 解包进行执行。

  • asyncio.wait(aws, ***, timeout=None, return_when=ALL_COMPLETED):并发运行阻塞等待直到return_when条件成立(等待多个协程对象

    • 返回两个 Task/Future 集合: (done, pending)。:done, pending = await asyncio.wait({aws})

    • 相同wait 和 gather 都是获取执行结果,但 wait 不会将 task 封装为 future,而 gatther 会。
      • 所以gatther可以直接看到结果,而 wait 需要调用 result()
    • 不同asyncio.wait 会返回两个值:donependingdone 为已完成的协程 Taskpending 为超时未完成的协程 Task,需通过 future.result 调用 Taskresult
      • asyncio.gather 返回的是所有已完成 Taskresult,不需要再进行调用或其他操作,就可以得到全部结果。
    • 从 Python 3.8 开始,不推荐使用 asyncio.wait() 将协程对象显式传递,并计划在 Python 3.11 中删除。

    常量 描述
    FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回。
    FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED
    ALL_COMPLETED 函数将在所有可等待对象结束或取消时返回。

  • asyncio.as_completed(aws, ***, timeout=None):并发地运行可迭代对象中的 可等待对象。 返回一个协程的迭代器。

    • 所返回的迭代器中,所获取到的下一个,总是剩余时间最少的。

      import asyncio
      import random
      
      async def sl():
          sleep = random.random()
          await asyncio.sleep(sleep)
          # 返回随机值
          return sleep
      
      async def main():
          # 创建一个任务列表
          task_list = [sl() for v in range(5)]
          # 每次都返回剩余时间最少的
          for coro in asyncio.as_completed(task_list):
              print(await coro)
      
      if __name__ == '__main__':
          asyncio.run(main())
  • asyncio.run_coroutine_threadsafe(coro, loop):向指定事件循环提交一个协程

    • 向指定事件循环提交一个协程。(线程安全)

      返回一个 concurrent.futures.Future 以等待来自其他 OS 线程的结果。

      此函数应该从另一个线程中调用,而非事件循环运行所在线程。

      # 如果内部产生了异常,将会通知future
      future = asyncio.run_coroutine_threadsafe(coro, loop)
          
      assert future.result(timeout) == 3
  • asyncio.to_thread(func, /, args, **kwargs):在线程中运行函数(在协程内执行一个不支持协程的函数)3.9 版本功能


task

  • 更加推荐使用 create_task 它是对下面Future 的封装 支持更多方法
  • 一个与 Future 类似 的对象,可运行 Python 协程。非线程安全。
  • Task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future 对象,Task 对象会挂起该协程的执行并等待该 Future 对象完成。当该 Future 对象 _完成_,被挂起的协程将恢复执行。
  • 事件循环使用协同日程调度: 一个事件循环每次运行一个 Task 对象。而一个 Task 对象会等待一个 Future 对象完成,该事件循环会运行其他 Task、回调或执行 IO 操作。

  • asyncio.current_task(loop=None):返回当前运行的 Task 实例,如果没有正在运行的任务则返回 None

  • asyncio.all_tasks(loop=None):返回事件循环所运行的未完成的 Task 对象的集合。

  • task.cancel(msg=None):请求取消 task 对象,这将安排在下一轮事件循环中抛出一个 CancelledError 异常给被封包的协程。

  • task.cancelled():如果 Task 对象 被取消 则返回 True

  • task.done():如果 Task 对象已完成,则返回 True。

  • task.result():返回 Task 的结果。

    • 如果 Task 对象 _已完成_,其封包的协程的结果会被返回 (或者当协程引发异常时,该异常会被重新引发。)

      如果 Task 对象 _被取消_,此方法会引发一个 CancelledError 异常。

      如果 Task 对象的结果还不可用,此方法会引发一个 InvalidStateError 异常。

  • task.exception():返回 Task 对象的异常。

    • 如果所封包的协程引发了一个异常,该异常将被返回。如果所封包的协程正常返回则该方法将返回 None
  • task.add_done_callback(callback, ***, *context=None*):添加一个回调,在 Task 执行完毕之后运行。

    • 可以用 functools.partial() 给回调函数传递参数,例如:
    # 当函数完成时,调用打印,传入 Future:
    fut.add_done_callback(functools.partial(print, "Future:"))
  • task.remove_done_callback(callback):从回调列表中删除回调。

  • task.get_name():返回 task 的名字。

  • task.set_name(value):设置 Task 名字


Future

  • 一个 Future 代表一个异步运算的最终结果。
  • Future 是一个 awaitable 对象。协程可以等待 Future 对象直到它们有结果或异常集合或被取消。
  • 通常 Future 用于支持底层回调式代码(例如在协议实现中使用 asyncio transports) 与高层异步/等待式代码交互。

  • future.result():返回被设置的future结果

  • future.set_result(result):将 Future 标记为 完成 并设置结果。如果 Future 已经 完成 则抛出一个 InvalidStateError 错误。

  • future.set_exception(exception):将 Future 标记为 完成 并设置一个异常。如果 Future 已经 完成 则抛出一个 InvalidStateError 错误。

  • future.done()如果 Future 为已 完成 则返回 True

    • 如果 Future 为 取消 或调用 set_result() 设置了结果或调用 set_exception() 设置了异常,那么它就是 完成
  • cancelled():如果 Future 已 取消 则返回 True:这个方法通常在设置结果或异常前用来检查 Future 是否已 取消

  • add_done_callback(callback, ***, context=None):添加一个在 Future *完成* 时运行的回调函数。

    • 可以用 functools.partial() 给回调函数传递参数,例如:
    # 当函数完成时,调用打印,传入 Future:
    fut.add_done_callback(functools.partial(print, "Future:"))
  • remove_done_callback(callback):从回调列表中移除 callback 。返回被移除的回调函数的数量,通常为 1,除非一个回调函数被添加多次。

  • cancel(msg=None):取消 Future 并调度回调函数。如果 Future 已经 完成取消 ,返回 False

    • 否则将 Future 状态改为 取消 并在调度回调函数后返回 True
  • exception():返回 Future 已设置的异常。

    只有 Future 在 完成 时才返回异常(或者 None ,如果没有设置异常)。

    如果 Future 已 _取消_,方法会引发一个 CancelledError 异常。

    如果 Future 还没 完成 ,这个方法会引发一个 InvalidStateError 异常。

  • get_loop():返回 Future 对象已绑定的事件循环。

import asyncio
# 一个接收返回值的小栗子

async def func(fut):
    # 等待两秒
    await asyncio.sleep(2)
    # 将传入的fut对象设置一个值
    fut.set_result('这里返回一个结果!')

async def start():
    # 这里获取当前事件循环 (asyncio.run外部创建的)
    loop = asyncio.get_running_loop()

    # 通过当前事件循环,创建一个fut任务,并且加入循环队列
    fut = loop.create_future()

    # 在事件循环中加入一个任务,并且将上面创建fut传递给任务
    loop.create_task(func(fut))

    # 一直等待上面的fut对象的结果,获取到结果会立即结束
    # 如果没有任务修改fut,则会一直等待下去
    result = await fut
    print(result)

asyncio.run(start())

在协程内执行同步函数

  • 事与愿违 有的时候必须调用同步函数 咋办?
  • 比如我们找到了一个库项目必须使用 但这个库就只有同步方法.
  • 可以通过创建一个进程池 将任务放入池内执行 这个过程不需要我们实现
import asyncio
from concurrent.futures import ProcessPoolExecutor # ThreadPoolExecutor 线程池

process_executor = ProcessPoolExecutor(20) # 创建20个进程的池

loop = asyncio.get_running_loop()  # 获取事件循环 通过进程池来执行函数 防止阻塞主进程
await loop.run_in_executor(process_executor, sleep) # 用run_in_executor将一个任务放入池内执行 不会影响主进程的执行
  • def run_in_executor(self, executor, func, *args): 第三个是*args 会根据位置 传递给函数
  • 如果没有特殊需求可以使用asyncio.to_thread()

线程安全

  • asyncio 原语不是线程安全的,因此它们不应被用于 OS 线程同步 (而应当使用 threading);
  • 这些同步原语的方法不接受 timeout 参数;请使用 asyncio.wait_for() 函数来执行带有超时的操作。

  • 要理解线程安全就先理解什么是线程不安全:
    • 当我们对一个变量用异步的方式每次递增,增加两百万次之后,可以发现每次的结果都是不一样的。
    • 因为对变量进行增加的操作不是原子操作(原子操作:在操作期间不会被切换到其他线程中进行执行。
  • 这就可能导致一些意想不到的数据丢失问题,那么解决这个问题就要用到锁。
    • 但用到锁必定会导致操作耗时的增加,另外一个线程可能就等着这个线程操作结束才会继续。
  • lock = asyncio.Lock()async with lock:实现一个用于 asyncio 任务的互斥锁。 非线程安全。用来保证对共享资源的独占访问。
  • lock.acquire():获取锁,等到直到锁为 unlocked 并将其设为 True,锁的获取是 公平的: 被执行的协程将是最开始等待锁的协程。
  • lock.release():释放锁。
  • lock.locked():如果锁为 locked 则返回 True

Event

  • asyncio.Event:asyncio 事件可被用来通知多个 asyncio 任务已经有事件发生。(继续执行)
    • Event 对象会管理一个内部旗标,可通过 set() 方法将其设为 true 并通过 clear() 方法将其重设为 _false_。
    • wait() 方法会阻塞直至该旗标被设为 _true_。 该旗标初始时会被设为 _false_。
    • 这个具体用到可以在官网找到例子。
  • event.wait():等待直到事件被设置为 True 后继续执行
  • event.set():设置事件,事件将被设置为 True
  • event.clear():清空事件,事件将被设置为 False
  • event.is_set():如果事件已被设置返回 True

队列:先进先出

  • queue = asyncio.Queue (_maxsize=0_)

  • 先进,先出(FIFO)队列:

  • 如果 maxsize 小于等于零,则队列尺寸是无限的。如果是大于 0 的整数,则当队列达到 maxsize 时, await put() 将阻塞至某个元素被 get() 取出。

  • maxsize:队列中可存放的元素数量。

  • empty():如果队列为空返回 True ,否则返回 False

  • full():如果有 maxsize 个条目在队列中,则返回 True

    • 如果队列用 maxsize=0 (默认)初始化,则 full() 永远不会返回 True
  • get():从队列中删除并返回一个元素。如果队列为空,则等待,直到队列中有元素。

  • get_nowait():立即返回一个队列中的元素,如果队列内有值,否则引发异常 QueueEmpty

  • join():阻塞至队列中所有的元素都被接收和处理完毕。

    • 当条目添加到队列的时候,未完成任务的计数就会增加。每当消费协程调用 task_done() 表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候, join() 阻塞被解除。
  • put(item):添加一个元素进队列。如果队列满了,在添加元素之前,会一直等待空闲插槽可用。

  • put_nowait(item):不阻塞的放一个元素入队列。如果没有立即可用的空闲槽,引发 QueueFull 异常。

  • qsize():返回队列用的元素数量。

  • task_done():表明前面排队的任务已经完成,即 get 出来的元素相关操作已经完成。

    • 由队列使用者控制。每个 get() 用于获取一个任务,任务最后调用 task_done() 告诉队列,这个任务已经完成。
    • 如果 join() 当前正在阻塞,在所有条目都被处理后,将解除阻塞(意味着每个 put() 进队列的条目的 task_done() 都被收到)。
    • 如果被调用的次数多于放入队列中的项目数量,将引发 ValueError

队列变种:先进后出

  • lefoquery = asyncio.LifoQueue

  • Queue 的变体,先取出最近添加的条目(后进,先出)。


等待

coro = asyncio.sleep(1, result=3):当前面的延迟结束之后,将会返回一个 result 的值。


保护任务

  • asyncio.shield(aw):保护一个可等待对象不被取消。

协程池

  • 原生的官方包不支持协程池
import asyncio


class CoroutinePool:
    def __init__(self, max_works):
        self.max_works = max_works
        self.tasks = []
        self.task_queue = asyncio.Queue()

    def submit_task(self, task, *args, **kwargs):
        self.task_queue.put_nowait([task, args, kwargs])

    async def __aenter__(self):
        for i in range(self.max_works):
            asyncio.create_task(self._task_handle())
            self.tasks.append(asyncio.create_task(self._task_handle()))

        return self

    async def __aexit__(self, exc_type, exc, tb):
        await self.task_queue.join()

        for task in self.tasks:
            task.cancel()

        await asyncio.gather(*self.tasks, return_exceptions=True)

    async def _task_handle(self):
        while True:
            task, args, kwargs = await self.task_queue.get()
            await task(*args, **kwargs)
            self.task_queue.task_done()


async def f(i):
    await asyncio.sleep(1)
    print(i, asyncio.current_task().get_name())


async def main():
    async with CoroutinePool(5) as pool:
        for i in range(10):
            pool.submit_task(f, i)


if __name__ == "__main__":
    asyncio.run(main())

文章作者: 林木木
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 林木木 !
评论
  目录