English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Concurrencia en Python2Uso de asyncio para manejar la concurrencia

asyncio

en Python 2La era, la programación de red de alto rendimiento se realizaba principalmente utilizando las bibliotecas Twisted、Tornado y Gevent, pero su código asincrónico no es compatible entre sí ni puede ser portado. Como se mencionó en el capítulo anterior, Gvanrossum quería que Python 3 Implementa una biblioteca de coroutines nativa basada en generadores, que tiene内置对异步IO的支持,这就是asyncio,它在Python 3.4Se introdujo en la biblioteca estándar.

El paquete asyncio utiliza coroutines impulsadas por un ciclo de eventos para implementar la concurrencia.

El paquete asyncio se conoció con el nombre en clave "Tulip" (tulipán) antes de ser introducido en la biblioteca estándar, por lo que a menudo verás este nombre de la flor al buscar información en línea.

¿Qué es el ciclo de eventos?63

Para Python, el paquete asyncio, que se añadió a la biblioteca estándar antes de su introducción, se conoce con el nombre en clave "Tulip" (tulipán). Por lo tanto, al buscar información en línea, a menudo verás este nombre de la flor./O ya está listo para leer y/o escrito como "cuando A ocurre" (a través del módulo selectors). Además de GUI y I/O, el ciclo de eventos también se utiliza a menudo para ejecutar código en otros hilos o subprocesos y usar el ciclo de eventos como mecanismo de regulación (por ejemplo, multitarea cooperativa). Si恰好 entiendes el GIL de Python, el ciclo de eventos es muy útil en lugares que necesitan liberar el GIL.

Hilos y coroutines

Vamos a ver dos fragmentos de código, uno implementado con el módulo threading y otro con el paquete asyncio.

# sinner_thread.py
import threading
import itertools
import time
import sys
class Signal: # Esta clase define un objeto mutable, utilizado para controlar hilos desde el exterior
 go = True
def spin(msg, signal): # Esta función se ejecutará en un hilo separado, el parámetro signal es una instancia de la clase Signal definida anteriormente
 write, flush = sys.stdout.write, sys.stdout.flush
 for char in itertools.cycle('|)/-\"): # The itertools.cycle function generates elements from the specified sequence repeatedly and continuously
  status = char + ' ' + msg
  write(status)
  flush()
  write('\x0)8' * len(status)) # Use the backspace character to move the cursor back to the beginning of the line
  time.sleep(.)1) # Cada 0.1 refresca una vez por segundo
  if not signal.go: # Si el atributo go no es True, sale del bucle
   break
 write(' ' * len(status) + '\x08' * len(status)) # Use spaces to clear the status message, move the cursor back to the beginning
def slow_function(): # Simula una operación de larga duración
 # Pretend to wait I/O some time
 time.sleep(3) # Llamar a sleep bloqueará la línea principal, hacer esto es para liberar el GIL, crear hilos dependientes
 return 42
def supervisor(): # Esta función configura hilos dependientes, muestra objetos de hilos, realiza cálculos de tiempo de ejecución y finalmente mata el proceso
 signal = Signal()
 spinner = threading.Thread(target=spin,
        args=('thinking!', signal))
 print('spinner object:', spinner) # Display the thread object Output spinner object: <Thread(Thread-1, initial)>
 spinner.start() # Start the subordinate process
 result = slow_function() # Run the slow_function line, blocking the main thread. At the same time, the book thread rotates the pointer in animation form
 signal.go = False
 spinner.join() # Wait for the spinner thread to end
 return result
def main():
 result = supervisor() 
 print('Answer', result)
if __name__ == '__main__':
 main()

Run it, and the result will be roughly like this:

This is an animation, the line before 'thinking' is moving (I increased the sleep time for screen recording)

Python does not provide an API to terminate threads, so if you want to close a thread, you must send a message to it. Here we use the signal.go attribute: set it to False in the main thread, and the spinner thread will receive it, then exit

Now let's look at the version using the asyncio package:

# spinner_asyncio.py
# Display text-based rotating pointer in animation form through coroutine
import asyncio
import itertools
import sys
@asyncio.coroutine # Coroutines intended for handling by asyncio must use @asyncio.coroutine decorator
def spin(msg):
 write, flush = sys.stdout.write, sys.stdout.flush
 for char in itertools.cycle('|)/-\"): # The itertools.cycle function generates elements from the specified sequence repeatedly and continuously
  status = char + ' ' + msg
  write(status)
  flush()
  write('\x0)8' * len(status)) # Use the backspace character to move the cursor back to the beginning of the line
  try:
   yield from asyncio.sleep(0.)1) # Use yield from asyncio.sleep(0.)1) instead of time.sleep(.)1Such hibernation will not block the event loop
  except asyncio.CancelledError: # If the spin function wakes up and throws an asyncio.CancelledError exception, the reason is that a cancellation request was made
   break
 write(' ' * len(status) + '\x08' * len(status)) # Use spaces to clear the status message, move the cursor back to the beginning
@asyncio.coroutine
def slow_function(): # 5 Now this function is a coroutine, use sleep to pretend to perform I/O operation, use yield from to continue the event loop
 # Pretend to wait I/O some time
 yield from asyncio.sleep(3) # This expression passes control to the main loop, and this coroutine will resume after the sleep ends
 return 42
@asyncio.coroutine
def supervisor(): # This function is also a coroutine, so it can use yield from to drive slow_function
 spinner = asyncio.async(spin('thinking!')) # The asyncio.async() function schedules the execution time of the coroutine, wraps the spin coroutine with a Task object, and returns immediately
 print('spinner object:', spinner) # Task object, output similar to spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>>
 # Drive the slow_function() function, get the return value after it ends. At the same time, the event loop continues to run,
 # Because the slow_function function uses yield from asyncio.sleep(3) expression passes control to the main loop
 result = yield from slow_function()
 # The Task object can be canceled; after cancellation, an asyncio.CancelledError exception will be thrown at the yield where the coroutine is currently paused
 # The coroutine can catch this exception, can delay cancellation, or even refuse cancellation
 spinner.cancel()
 return result
def main():
 loop = asyncio.get_event_loop() # Get the reference to the event loop
 # Drive the supervisor coroutine to completion; the return value of this coroutine is the return value of this call
 result = loop.run_until_complete(supervisor())
 loop.close()
 print('Answer', result)
if __name__ == '__main__':
 main()

A menos que desee bloquear la línea principal, congelar el ciclo de eventos o toda la aplicación, no utilice time.sleep() en coroutines de asyncio.

Si la coroutine necesita hacer nada durante un período de tiempo, debe usar yield from asyncio.sleep(DELAY).

El decorador @asyncio.coroutine no es una exigencia, pero se recomienda hacerlo porque así se puede resaltar la coroutine en el código, si aún no ha producido un valor, la coroutine liberará la memoria de basura (lo que significa que la operación no se ha completado, puede haber defectos) y emitirá una advertencia. Este decorador no activará la coroutine.

Los resultados de ejecución de estos dos bloques de código son básicamente idénticos, ahora veamos las principales diferencias entre los dos bloques de código: el supervisor.

  1. El objeto asyncio.Task es casi equivalente al objeto threading.Thread (los objetos Task son como hilos verdes en las bibliotecas de multitarea de escritura)
  2. El objeto Task se utiliza para impulsar las coroutines, mientras que el objeto Thread se utiliza para llamar a objetos invocables.
  3. El objeto Task no se instancia por sí mismo, sino que se obtiene pasando la coroutine a la función asyncio.async(...) o al método loop.create_task(...).
  4. El objeto Task obtenido ya tiene una programación de tiempo de ejecución; el ejemplo de Thread debe llamar al método start para informar explícitamente que debe ejecutarse.
  5. En la función supervisor de versión de hilo, slow_function es una función común, llamada directamente por el hilo, mientras que la función slow_function de versión asincrónica es una coroutine, impulsada por yield from.
  6. No hay API que pueda finalizar un hilo desde fuera, porque los hilos pueden interrumpirse en cualquier momento. Mientras que si se desea finalizar una tarea, se puede usar el método de instancia Task.cancel(), lanzar una excepción CancelledError en el interior de la coroutine. La coroutine puede capturar esta excepción en el punto de yield pausado para manejar la solicitud de finalización.
  7. Las coroutines de supervisor deben ejecutarse en la función main utilizando el método loop.run_until_complete.

Una de las ventajas clave de las coroutines en comparación con los hilos es que los hilos deben recordar mantener el candado para proteger las partes importantes del programa, evitar que las operaciones múltiples se interrumpan durante el proceso de ejecución y evitar que el agua y las montañas estén en un estado de Xiaoxiao. Por defecto, las coroutines protegerán, y debemos explicitamente producir (usar yield o yield from para ceder el control) para que el resto del programa funcione.

asyncio.Future: se故意 no bloquea

La clase asyncio.Future y la clase concurrent.futures.Future tienen una interfaz básicamente idéntica, pero la forma de implementación es diferente y no se pueden intercambiar.

El artículo anterior [python concurrente 1: manejo de concurrencia con futures () hemos presentado el objeto future de concurrent.futures.Future. En concurrent.futures.Future, el futuro solo es el resultado de la programación de la ejecución de algo. En el paquete asyncio, el método BaseEventLoop.create_task(...) recibe un coroutine, programa su tiempo de ejecución y luego devuelve una instancia de asyncio.Task (también es una instancia de asyncio.Future, ya que Task es una subclase de Future y se utiliza para encapsular coroutines. (En concurrent.futures.Future, la operación similar es Executor.submit(...)).

Al igual que la clase concurrent.futures.Future, la clase asyncio.Future también proporciona

  1. .done() devuelve un valor booleano que indica si el Future ya se ha ejecutado.
  2. El método .add_done_callback() tiene solo un parámetro, que es un objeto invocable. Después de que el Future finalice, se llamará a este objeto.
  3. El método .result() no tiene parámetros, por lo que no se puede especificar el tiempo de expiración. Si se llama al método .result() y el tiempo aún no ha finalizado, se lanzará la excepción asyncio.InvalidStateError.

Llamar al método result() en el objeto Future de la clase concurrent.futures.Future después de que finalice la ejecución devolverá el resultado del objeto invocable o lanzará la excepción que se lanzó al invocar el objeto invocable. Si se llama al método f.result() cuando el Future no ha finalizado, se bloqueará la línea de ejecución del solicitante hasta que se devuelva un resultado. En este momento, el método result() también puede recibir un parámetro timeout. Si el Future no se completa en el tiempo especificado, se lanzará la excepción TimeoutError.

Cuando usamos asyncio.Future, generalmente usamos yield from para obtener los resultados, en lugar de usar el método result(). La expresión yield from genera valores de retorno en el hilo deCoroutine en pausa, y luego se reanuda el proceso de ejecución.

La clase asyncio.Future tiene como objetivo usarse junto con yield from, por lo que generalmente no es necesario usar los siguientes métodos:

  1. No es necesario llamar a my_future.add_down_callback(...), porque se puede colocar directamente la operación que se desea realizar después de que el future se ejecute en la parte posterior de la expresión yield from my_future en la coroutine. (Debido a que las coroutines pueden pausar y reanudar funciones)
  2. No es necesario llamar a my_future.result(), porque el resultado producido por yield from es (result = yield from my_future)

En el paquete asyncio, se puede usar yield from para producir resultados de un objeto asyncio.Future. Esto significa que podemos escribir así:

res = yield from foo() # foo puede ser una función coroutine, o una función común que devuelve una instancia de Future o task

asyncio.async(...)* función

asyncio.async(coro_or_future, *, loop=None)

Esta función unifica coroutine y Future: el primer parámetro puede ser cualquiera de los dos. Si es un objeto Future o Task, se devuelve directamente; si es una coroutine, la función async llama automáticamente al método loop.create_task(...) para crear un objeto Task. El parámetro loop es opcional y se utiliza para pasar el ciclo de eventos; si no se pasa, la función async obtiene el objeto ciclo de eventos llamando a la función asyncio.get_event_loop().

BaseEventLoop.create_task(coro)

Este método planifica el tiempo de ejecución de la coroutine y devuelve un objeto asyncio.Task. Si se llama a este método en una subclase personalizada de BaseEventLoop, el objeto devuelto puede ser una instancia de alguna clase compatible con Task de una biblioteca externa.

El método BaseEventLoop.create_task() está disponible solo en Python3.4.2 y versiones posteriores están disponibles. Python3.3 Sólo se puede usar la función asyncio.async(...).
Si desea experimentar future y coroutines en la consola de Python o en scripts de prueba pequeños, puede usar el siguiente fragmento:

import asyncio
def run_sync(coro_or_future):
 loop = asyncio.get_event_loop()
 return loop.run_until_complete(coro_or_future)
a = run_sync(some_coroutine())

Descargar usando asyncio y el paquete aiohttp

Ahora que hemos entendido los fundamentos de asyncio, es hora de reescribir lo que escribimos en nuestro artículo anterior [python concurrente 1:Usar futures para manejar la descarga concurrente de banderas ()

Veamos el código primero:

import asyncio
import aiohttp # Se necesita pip install aiohttp
from flags import save_flag, show, main, BASE_URL
@asyncio.coroutine # Sabemos que los coroutines deben usar el decorador asyncio.coroutine
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
  # Las operaciones bloqueantes se implementan mediante coroutines, y el código del cliente delega la responsabilidad a las coroutines mediante yield from para operaciones asíncronas
 resp = yield from aiohttp.request('GET', url) 
 # La lectura también es una operación asíncrona
 image = yield from resp.read()
 return image
@asyncio.coroutine
def download_one(cc): # Esta función también debe ser un coroutine, ya que utiliza yield from
 image = yield from get_flag(cc) 
 show(cc)
 save_flag(image, cc.lower()) + '.gif')}
 return cc
def download_many(cc_list):
 loop = asyncio.get_event_loop() # Obtener la referencia a la implementación subyacente del número de eventos
 to_do = [download_one(cc) for cc in sorted(cc_list)] # Llamar a download_one para obtener cada bandera y construir una lista de objetos de generador
 # Aunque el nombre de la función es wait, no es una función bloqueante; wait es un coroutine, que finaliza cuando todos los coroutines entregados a él se han ejecutado
 wait_coro = asyncio.wait(to_do)
 res, _ = loop.run_until_complete(wait_coro) # Ejecutar el ciclo de eventos hasta que wait_coro finalice; durante la ejecución del ciclo de eventos, este script se bloqueará aquí
 loop.close() # Cerrar el ciclo de eventos
 return len(res)
if __name__ == '__main__':
 main(download_many)

La descripción general de la ejecución de este código es la siguiente:

  1. En la función download_many, se obtiene un ciclo de eventos y se procesan varios objetos de coroutine generados por la función download_one
  2. El ciclo de eventos de asyncio activa cada coroutine una vez
  3. Cuando el coroutine (get_flag) en el código del cliente delega la responsabilidad a los coroutines en la biblioteca (aiohttp.request) utilizando yield from, el control se devuelve al ciclo de eventos, y se ejecuta el coroutine programado anteriormente
  4. El bucle de eventos recibe notificaciones a través de una API de bajo nivel basada en callbacks después de que se completan las operaciones bloqueantes.
  5. Después de recibir la notificación, el bucle principal envía los resultados a la coroutine en pausa
  6. La coroutine avanza hasta la siguiente expresión yield from, por ejemplo, get_flag yield from resp.read(). El bucle de eventos recupera el control, repitiendo el4~6paso, hasta que el ciclo termine.

En la función download_many, usamos la función asyncio.wait(...), que es una coroutine, y los parámetros de la coroutine son un objeto iterable compuesto por futuros o coroutines; wait encapsula cada coroutine en un objeto Task. El resultado final es que todos los objetos procesados por wait se convierten en instancias de la clase Future.

wait es una función de coroutine, por lo que devuelve un objeto de coroutine o generador; el variable waite_coro almacena este tipo de objeto

El método loop.run_until_complete toma como parámetro un futuro o una coroutine. Si es una coroutine, el método run_until_complete es similar a la función wait, y encapsula la coroutine en un objeto Task. Aquí, el método run_until_complete encapsula wait_coro en un objeto Task, impulsado por yield from. Después de que wait_coro se ejecute, devuelve dos parámetros: el primer parámetro es el futuro terminado y el segundo parámetro es el futuro no terminado.

<section class="caption">esperar</La sección> tiene dos parámetros nombrados, timeout y return_when, que si se configuran pueden devolver futuros no terminados.

Tal vez también hayas notado que hemos rewritten la función get_flags, porque la biblioteca requests que usamos anteriormente ejecutaba I/O/Operación O. Para usar el paquete asyncio, debemos cambiar la función a una versión asíncrona.

Consejo útil

Si encuentras que el código se hace difícil de entender después de usar coroutines, puedes seguir el consejo del padre de Python (Guido van Rossum) y fingir que no existe yield from.

Tomemos este fragmento de código como ejemplo:

@asyncio.coroutine
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = yield from aiohttp.request('GET', url) 
 image = yield from resp.read()
 return image
# Eliminar yield form
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = aiohttp.request('GET', url) 
 image = resp.read()
 return image
# Ahora es más claro, ¿no?

conocimientos

al usar yield from en la API del paquete asyncio, hay un detalle que prestar atención:

al usar el paquete asyncio, el código asíncrono que escribimos contiene subprocesos impulsados por asyncio en sí mismo (generadores delegados), y los generadores finalmente delegan la responsabilidad a los subprocesos del paquete asyncio o a bibliotecas de terceros. Este método es equivalente a construir un tubo, permitiendo que el ciclo de eventos de asyncio impulse la ejecución de la I/O asíncrona subyacente./la función de biblioteca O.

evitar llamadas bloqueantes

primero veamos un gráfico, que muestra el retraso en la lectura de datos por parte de la computadora desde diferentes medios de almacenamiento:

a través de este gráfico, podemos ver que las llamadas bloqueantes son un desperdicio gigante para el CPU. ¿Hay alguna manera de evitar que las llamadas bloqueantes interrumpan toda la aplicación?

hay dos métodos:

  1. ejecutar cada operación bloqueante en un hilo separado
  2. convertir cada operación bloqueante en una llamada asíncrona no bloqueante

por supuesto, recomendamos la segunda solución, ya que la primera es demasiado costosa si cada conexión utiliza un hilo.

segunda, podemos implementar programación asíncrona utilizando generadores como subprocesos. Para el ciclo de eventos, llamar a la función de devolución de llamada es similar a llamar al método .send() en un subproceso en pausa. Los subprocesos en pausa consumen mucha menos memoria que los hilos.

ahora, debes entender por qué el script flags_asyncio.py es mucho más rápido que flags.py.

debido a que flags.py descarga de manera sincrónica, cada descarga requiere decenas de miles de millones de ciclos de CPU para esperar el resultado. Mientras tanto, en flags_asyncio.py, al llamar al método loop.run_until_complete en la función download_many, el ciclo de eventos impulsa a los subprocesos de descarga_one, que se ejecutan hasta la expresión yield from, que a su vez impulsa a los subprocesos get_flag hasta la primera expresión yield from, donde se llama a la función aiohttp.request(). Estas llamadas no bloquean, por lo que en unos pocos segundos se pueden comenzar todas las solicitudes.

mejorar el script de descarga de asyncio

ahora mejoramos el archivo flags_asyncio.py, agregando manejo de excepciones y un contador

import asyncio
import collections
from collections import namedtuple
from enum import Enum
import aiohttp
from aiohttp import web
from flags import save_flag, show, main, BASE_URL
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
Result = namedtuple('Result', 'status data')
HTTPStatus = Enum('Status', 'ok not_found error')
# 自定义异常用于包装其他HTTP或网络异常,并获取country_code,以便报告错误
class FetchError(Exception):
 def __init__(self, country_code):
  self.country_code = country_code
@asyncio.coroutine
def get_flag(cc):
 # 此协程有三种返回结果:
 # 1. 返回下载到的图片
 # 2. HTTP 响应为404 . 当返回时,抛出web.HTTPNotFound 异常
 # 3. 当返回其他HTTP状态码时, 抛出aiohttp.HttpProcessingError
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = yield from aiohttp.request('GET', url)
 if resp.status == 200:
  image = yield from resp.read()
  return image
 elif resp.status == 404:
  raise web.HttpNotFound()
 else:
  raise aiohttp.HttpProcessionError(
   code=resp.status, message=resp.reason,
   headers=resp.headers
  )
@asyncio.coroutine
def download_one(cc, semaphore):
 # semaphore 参数是 asyncio.Semaphore 类的实例
 # Semaphore 类是同步装置,用于限制并发请求
 try:
  with (yield from semaphore):
    # 在yield from 表达式中把semaphore 当成上下文管理器使用,防止阻塞整个系统
    # 如果semaphore 计数器的值是所允许的最大值,只有这个协程会阻塞
    image = yield from get_flag(cc)
    # 退出with语句后 semaphore 计数器的值会递减,
    # 解除阻塞可能在等待同一个semaphore对象的其他协程实例
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  save_flag(image, cc.lower()) + '.gif')}
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)
@asyncio.coroutine
def downloader_coro(cc_list):
 counter = collections.Counter()
 # 创建一个 asyncio.Semaphore 实例,最多允许激活MAX_CONCUR_REQ个使用这个计数器的协程
 semaphore = asyncio.Semaphore(MAX_CONCUR_REQ)
 # 多次调用 download_one 协程,创建一个协程对象列表
 to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
 # Obtener un iterador que devuelva el futuro después de que el futuro se ejecute
 to_do_iter = asyncio.as_completed(to_do)
 for future in to_do_iter:
  # Iterar sobre los futuros permitidos a terminar 
  try:
   res = yield from future # Obtener el resultado del objeto asyncio.Future (también se puede llamar future.result)
  except FetchError as exc:
   # Todas las excepciones lanzadas se envuelven en un objeto FetchError
   country_code = exc.country_code
   try:
    # Intenta obtener el mensaje de error de la excepción original (__cause__)
    error_msg = exc.__cause__.args[0]
   except IndexError:
    # Si no se encuentra el mensaje de error en la excepción original, se utiliza el nombre de la clase de la excepción conectada como mensaje de error
    error_msg = exc.__cause__.__class__.__name__
   if error_msg:
    msg = '*** Error for {}: {}'
    print(msg.format(country_code, error_msg))
   status = HTTPStatus.error
  else:
   status = res.status
  counter[status] += 1
 return counter
def download_many(cc_list):
 loop = asyncio.get_event_loop()
 coro = downloader_coro(cc_list)
 counts = loop.run_until_complete(coro)
 loop.close()
 return counts
if __name__ == '__main__':
 main(download_many)

Dado que las solicitudes iniciadas por el coroutine son rápidas, para evitar que se envíen demasiadas solicitudes concurrentes al servidor y sobrecargarlo, creamos una instancia de asyncio.Semaphore en la función download_coro y la pasamos a la función download_one.

<secion class="caption">Semaphore</El objeto section> mantiene un contador interno, si se llama al método de coroutine .acquire() en el objeto, el contador se disminuye; si se llama al método de coroutine .release() en el objeto, el contador se aumenta. El valor del contador se establece en el momento de la inicialización.

Si el contador es mayor de 0, la llamada al método .acquire() no bloqueará, si el contador es 0, el método .acquire() bloqueará el coroutine que llama a este método hasta que otro coroutine llame al método .release() en el mismo objeto Semaphore, aumentando el contador.

En el código anterior, no llamamos manualmente a los métodos .acquire() o .release(), sino que en la función download_one usamos el semaphore como un administrador de contexto:

with (yield from semaphore):
 image = yield from get_flag(cc)

Este código garantiza que en cualquier momento no se iniciará más de MAX_CONCUR_REQ coroutines get_flag.

Usar la función asyncio.as_completed

Debido a que se debe usar yield from para obtener el resultado del futuro producido por la función asyncio.as_completed, la función as_completed debe llamarse dentro de un coroutine. Ya que download_many debe pasarse como parámetro a la función main no coroutine, he añadido un nuevo coroutine downloader_coro, para que la función download_many solo se utilice para configurar el ciclo de eventos.

Usar el objeto Executor para evitar bloqueos en el ciclo de eventos

Regresemos al gráfico de demoras en la lectura de datos desde diferentes medios de almacenamiento del ordenador, hay algo que destacar en tiempo real, es que el acceso al sistema de archivos local también bloquea.

En el código anterior, la función save_flag bloquea la única línea compartida entre el código del cliente y el ciclo de eventos de asyncio, por lo que al guardar el archivo, toda la aplicación se detiene. Para evitar este problema, se puede usar el método run_in_executor del objeto del ciclo de eventos.

El ciclo de eventos de asyncio mantiene un objeto ThreadPoolExecutor en segundo plano, podemos llamar al método run_in_executor para enviar un objeto invocable a su ejecución.

A continuación, se muestra el código modificado:

@asyncio.coroutine
def download_one(cc, semaphore):
 try:
  with (yield from semaphore):
   image = yield from get_flag(cc)
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  # Aquí se realiza el cambio
  loop = asyncio.get_event_loop() # Obtiene la referencia al ciclo de eventos
  loop.run_in_executor(None, save_flag, image, cc.lower()) + '.gif')}
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

El primer parámetro del método run_in_executor es una instancia de Executor; si se establece en None, se utiliza la instancia predeterminada ThreadPoolExecutor del ciclo de eventos.

Desde los callbacks hasta los futures hasta las coroutines

Antes de conocer las coroutines, podríamos tener cierta comprensión de los callbacks, entonces, ¿qué mejoras tienen las coroutines en comparación con los callbacks?

Estilo de código de callback en Python:

def stage1(response1):
 request2 = step1(response1)
 api_call2(request2, stage2)
def stage2(response2):
 request3 = step3(response3)
 api_call3(request3, stage3) 
 def stage3(response3):
  step3(response3) 
api_call1(request1, stage1)

Defectos del código anterior:

  1. Puede aparecer el infierno de los callbacks
  2. El código es difícil de leer

En este problema, las coroutines pueden desempeñar un papel muy importante. Si se cambia al código asíncrono realizado con coroutines y yield from, el ejemplo de código sería el siguiente:

@asyncio.coroutine
def three_stages(request1):
 response1 = yield from api_call1(request1)
 request2 = step1(response1)
 response2 = yield from api_call2(requests)
 request3 = step2(response2)
 response3 = yield from api_call3(requests)
 step3(response3) 
loop.create_task(three_stages(request1)

En comparación con el código anterior, este código es mucho más fácil de entender. Si se realiza una llamada asíncrona a api_call1,api_call2,api_call3 Se lanzará una excepción, por lo que se puede colocar la expresión yield from correspondiente en el bloque try/except el bloque de manejo de excepciones.

Es necesario acostumbrarse al expresión yield from para las coroutines y las coroutines no pueden llamarse directamente, deben programarse explícitamente el tiempo de ejecución de las coroutines o usar la expresión yield from en otras coroutines programadas para activarla. Si no se utiliza loop.create_task(three_stages(request1)) entonces no sucederá nada.

A continuación, demos un ejemplo práctico para ilustrarlo:

Se realizan múltiples solicitudes de descarga cada vez

Modificamos el código de descarga de la bandera anterior para que, mientras descarga la bandera, también pueda obtener el nombre del país y usarlo al guardar la imagen.
Usamos la coroutine y yield from para resolver este problema:

@asyncio.coroutine
def http_get(url):
 resp = yield from aiohttp.request('GET', url)
 if resp.status == 200:
  ctype = resp.headers.get('Content-type', '').lower()
  si 'json' está en ctype o url termina en 'json':
   data = yield from resp.json()
  else:
   data = yield from resp.read()
  return data
 elif resp.status == 404:
  raise web.HttpNotFound()
 else:
  raise aiohttp.HttpProcessionError(
   code=resp.status, message=resp.reason,
   headers=resp.headers)
@asyncio.coroutine
def get_country(cc):
 url = ""/{cc}/metadata.json".format(BASE_URL, cc=cc.lower())
 metadata = yield from http_get(url)
 return metadata['country']
@asyncio.coroutine
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 return (yield from http_get(url))
@asyncio.coroutine
def download_one(cc, semaphore):
 try:
  with (yield from semaphore):
   image = yield from get_flag(cc)
  with (yield from semaphore):
   country = yield from get_country(cc)
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  country = country.replace(' ', '_')
  filename = '{}--{}.gif'.format(country, cc)
  print(filename)
  loop = asyncio.get_event_loop()
  loop.run_in_executor(None, save_flag, image, filename)
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

En este fragmento de código, llamamos a get_flag y get_country en dos bloques with controlados por semaphore en la función download_one para ahorrar tiempo.

La declaración de return de get_flag se agrega en el nivel superior con paréntesis porque la prioridad del operador de () es alta, ejecutará primero la expresión yield from dentro del paréntesis. Si no se agrega, se informará un error de sintaxis

Agregar () es equivalente a

image = yield from http_get(url)
return image

Si no se agregan (), el programa se interrumpirá en yield from, entregando el control, en este momento, el uso de return informará un error de sintaxis.

Resumen

En esta entrada, discutimos:

  1. Se compara un programa multithreaded y una versión de asyncio, se explica la relación entre los threads y las tareas asíncronas
  2. Se compara la diferencia entre las clases asyncio.Future y concurrent.futures.Future
  3. Cómo utilizar la programación asíncrona para gestionar la alta concurrencia en aplicaciones de red
  4. En la programación asíncrona, en comparación con las devoluciones de llamada, las coroutines mejoran significativamente el rendimiento

Esto es todo el contenido de este artículo, espero que sea útil para su aprendizaje y que todos nos apoyen en el tutorial de alarido.

Declaración: Este artículo se ha redactado en línea, pertenece al propietario original, el contenido se ha contribuido y subido por los usuarios de Internet, este sitio web no posee los derechos de propiedad, no se ha realizado un procesamiento editorial manual y no asume la responsabilidad legal relevante. Si encuentra contenido sospechoso de infracción de derechos de autor, le invitamos a enviar un correo electrónico a: notice#w3Declaración: El contenido de este artículo se ha obtenido de la red, pertenece al propietario original, el contenido se ha contribuido y subido por los usuarios de Internet, este sitio web no posee los derechos de propiedad, no se ha realizado un procesamiento editorial manual y no asume la responsabilidad legal relevante. Si encuentra contenido sospechoso de infracción de derechos de autor, le invitamos a enviar un correo electrónico a: notice#w

Te gustará