📜  如何在 Python3 中使用 ThreadPoolExecutor ?

📅  最后修改于: 2022-05-13 01:54:57.937000             🧑  作者: Mango

如何在 Python3 中使用 ThreadPoolExecutor ?

先决条件:多线程

线程允许代码并行, Python语言有两种方法来实现它的第一个是通过多处理模块,第二个是通过多线程模块。多线程非常适合加速 I/O 绑定任务,例如发出 Web 请求、数据库操作或读取/写入文件。与这种 CPU 密集型任务(如数学计算任务)相比,使用多处理受益最多。这是由于 GIL(全局解释器锁定)造成的。

从Python 3.2 开始,在Python中的concurrent.futures模块中引入了一个名为ThreadPoolExecutor的新类,以有效地管理和创建线程。但是等等,如果Python已经内置了一个线程模块,那么为什么要引入一个新模块。让我先回答这个问题。

  • 当线程数较少时,动态生成新线程不是问题,但如果我们处理许多线程,管理线程就变得非常麻烦。除此之外,创建如此多的线程在计算上效率低下,这将导致吞吐量下降。保持吞吐量的一种方法是预先创建并实例化一个空闲线程池,然后重用该池中的线程,直到所有线程都用完为止。这样可以减少创建新线程的开销。
  • 此外,池会跟踪和管理线程生命周期并代表程序员对其进行调度,从而使代码更简单,错误更少。

线程池执行器方法:

ThreadPoolExecutor 类公开了三个异步执行线程的方法。下面给出详细的解释。

  1. submit(fn, *args, **kwargs):它运行一个可调用或一个方法,并返回一个表示方法执行状态的 Future 对象。
  2. map(fn, *iterables, timeout = None, chunksize = 1) :
    • 它立即将方法和可迭代对象映射到一起,并将引发并发异常。如果在超时限制内未能这样做,则 futures.TimeoutError 。
    • 如果可迭代对象非常大,那么在使用 ProcessPoolExecutor 时,块大小大于 1 可以提高性能,但使用 ThreadPoolExecutor 则没有这样的优势,即可以保留其默认值。
  3. 关闭(等待 = True,*,cancel_futures = False):
    • 它向执行者发出信号,在期货执行完毕后释放所有资源。
    • 它必须在 exectuor.submit() 和 executor.map() 方法之前调用,否则会抛出 RuntimeError。
    • wait=True 使该方法在所有线程执行完毕并释放资源之前不返回。
    • cancel_futures=True 那么执行程序将取消所有尚未启动的未来线程。

示例 1:

下面的代码演示了 ThreadPoolExecutor 的使用,注意与线程模块不同,我们不必使用循环显式调用,使用列表跟踪线程或使用 join 等待线程进行同步,或在线程之后释放资源完成后,构造函数本身将所有内容都隐藏起来,从而使代码紧凑且无错误。

Python3
from concurrent.futures import ThreadPoolExecutor
from time import sleep
  
values = [3,4,5,6]
  
def cube(x):
    print(f'Cube of {x}:{x*x*x}')
  
  
if __name__ == '__main__':
    result =[]
    with ThreadPoolExecutor(max_workers=5) as exe:
        exe.submit(cube,2)
          
        # Maps the method 'cube' with a list of values.
        result = exe.map(cube,values)
      
    for r in result:
      print(r)


Python3
import requests
import time
import concurrent.futures
   
img_urls = [
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623210949/download21.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623211125/d11.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623211655/d31.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623212213/d4.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623212607/d5.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623235904/d6.jpg',
]
   
t1 = time.perf_counter()
def download_image(img_url):
    img_bytes = requests.get(img_url).content
    print("Downloading..")
   
# Download images 1 by 1 => slow
for img in img_urls:
  download_image(img)
t2 = time.perf_counter()
print(f'Single Threaded Code Took :{t2 - t1} seconds')
   
print('*'*50)
   
t1 = time.perf_counter()
def download_image(img_url):
    img_bytes = requests.get(img_url).content
    print("Downloading..")
   
# Fetching images concurrently thus speeds up the download.
with concurrent.futures.ThreadPoolExecutor(3) as executor:
    executor.map(download_image, img_urls)
   
t2 = time.perf_counter()
print(f'MultiThreaded Code Took:{t2 - t1} seconds')


输出:

Output: 
Cube of 2:8
Cube of 3:27
Cube of 4:64
Cube of 5:125
Cube of 6:216

示例 2:

下面的代码是通过发出 HTTP 请求在 Internet 上获取图像,我正在使用相同的请求库。代码的第一部分对 API 进行一对一调用,即下载很慢,而代码的第二部分使用线程来获取 API 进行并行请求。

您可以尝试上面讨论的所有各种参数,以查看它如何调整加速,例如,如果我将线程池设为 6 而不是 3,则加速更显着。

蟒蛇3

import requests
import time
import concurrent.futures
   
img_urls = [
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623210949/download21.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623211125/d11.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623211655/d31.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623212213/d4.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623212607/d5.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623235904/d6.jpg',
]
   
t1 = time.perf_counter()
def download_image(img_url):
    img_bytes = requests.get(img_url).content
    print("Downloading..")
   
# Download images 1 by 1 => slow
for img in img_urls:
  download_image(img)
t2 = time.perf_counter()
print(f'Single Threaded Code Took :{t2 - t1} seconds')
   
print('*'*50)
   
t1 = time.perf_counter()
def download_image(img_url):
    img_bytes = requests.get(img_url).content
    print("Downloading..")
   
# Fetching images concurrently thus speeds up the download.
with concurrent.futures.ThreadPoolExecutor(3) as executor:
    executor.map(download_image, img_urls)
   
t2 = time.perf_counter()
print(f'MultiThreaded Code Took:{t2 - t1} seconds')

输出:

Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Single Threaded Code Took :2.5529379630024778 seconds
**************************************************
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
MultiThreaded Code Took:0.5221083430078579 seconds