
concurrent.futures 【執行緒、程序】
concurrent.futures
該 module 用於 asynchronously(非同步地、異步地)執行程式碼。使用 abstract class 直接管理複雜的 thread 或 process,使您能夠專注於撰寫程式碼,而無需深入了解底層細節。用法上比使用 multiprocessing、threading、asyncio module 容易得多,是相當推薦的 built-in library。它提供了兩個主要的 Class:ThreadPoolExecutor
針對 Thread(執行緒),ProcessPoolExecutor
針對 Process(程序)。

使用 ThreadPoolExecutor
可以認為是 Concurrency,而 ProcessPoolExecutor
則可以當成 Parallelism。




ThreadPoolExecutor
會在同一個 Process 裡面做事。當遇到被阻塞時,會 hang-up 並且由另外一個 Thread 繼續做事。
ProcessPoolExecutor
會建立新的 Process 來處理,當遇到被阻塞時,不會hang-up 並且持續不斷的計算。對於這種狀況,使用 Process 就如同開影分身,能夠讓整體的計算更快完成。
Thread vs Process
Thread :程式執行任務的基本單位。
Process:啟動應用程式時產生的執行實體,需要一定的 CPU 與記憶體資源,Process 由一到多個 Thread 組成,同一個 Process 裡的 Thread 可以共用記憶體資源。
Executor
提供非同步執行調用方法的 abstract class。
submit()
用於在單獨的 thread 中非同步執行可調用的物件(函數或方法),並返回表示計算結果的 Future 物件。
submit(fn, *args, **kwargs)
fn
:要非同步執行的可調用的物件。 *args
:傳遞給可調用物件的 positional arguments。 **kwargs
:傳遞給可調用物件的 keyword arguments。
範例 – 基礎 1
from concurrent import futures
def process_data(data):
for x in range(data):
print(x)
return data ** 2
# Create worker threads
with futures.ThreadPoolExecutor(max_workers=5) as executor:
# Submit tasks
future1 = executor.submit(process_data, 10)
future2 = executor.submit(process_data, 11)
# Wait results
result1 = future1.result()
result2 = future2.result()
print(result1)
print(result2)
執行結果:
0
1
2
0
1
2
3
4
5
6
7
8
9
10
11
3
4
5
6
7
8
9
10
121
144
範例 – 基礎 2
import concurrent.futures
import time
def task(index):
print(f"Task {index} executed.")
time.sleep(1)
# Create worker threads
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
time_1 = time.time()
# Submit tasks
futures = []
for i in range(5):
future = executor.submit(task, i)
futures.append(future)
executor.shutdown()
print("All tasks have completed.")
time_2 = time.time()
print(f'{time_2 - time_1:.4f} s')
執行結果:
Task 0 executed.
Task 1 executed.
Task 2 executed.
Task 3 executed.
Task 4 executed.
All tasks have completed.
1.0043 s
範例 – 串接 task
import concurrent.futures
def task_a(data):
result_a = data * 2
return result_a
def task_b(data):
result_b = data * 2
return result_b
def task_c(data):
result_c = data * 2
return result_c
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks
task_a = executor.submit(task_a, 2)
task_b = executor.submit(task_b, task_a.result())
task_c = executor.submit(task_c, task_b.result())
print(task_a.result())
print(task_b.result())
print(task_c.result())
執行結果:
4
8
16
map()
返回一個 iterator,我們可以在任務完成後,立即處理任務結果。而不是等待所有任務完成,一次性返回。 這在處理大量任務或任務完成順序很重要時,特別有用。
map(func,
*iterables,
timeout=None,
chunksize=1)
func
調用的函數。如果 func
引發 exception 且從 iterator 檢索其值時,將引發該 exception。
*iterables
將 func
應用於 iterables
的每個項目。
timeout
等待結果的最大秒數。如果 timeout=None
,將無限期阻塞,直到結果可用。
chunksize
當 map()
與 ProcessPoolExecutor
一起使用時,將輸入的 iterables 物件劃分為較小的 chunks(區塊)。
每個 chunk 作為獨立的任務提交,由多個 work process 同時處理。 此參數確定每個 chunk 中,將有多少 iterable 物件在一起。當您有很長的 iterables 物件時,使用較大的 chunksize
可以增強 map()
的性能。(減少Submit tasks 以及主要 process 和 work process 之間通訊的開銷)
範例 – 基礎
import time
import concurrent.futures
def some_task(n):
time.sleep(1)
return n * 2
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks
futures = [executor.submit(some_task, i) for i in range(10)]
# Use as_completed()
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(f"Task result: {result}")
# Use map()
inputs = [i for i in range(10)]
results = executor.map(some_task, inputs)
for result in results:
print(f"Task result: {result}")
執行結果:
Task result: 2
Task result: 4
Task result: 0
Task result: 8
Task result: 10
Task result: 6
Task result: 14
Task result: 16
Task result: 12
Task result: 18
Task result: 0
Task result: 2
Task result: 4
Task result: 6
Task result: 8
Task result: 10
Task result: 12
Task result: 14
Task result: 16
Task result: 18
範例 – Exception
from concurrent.futures import ThreadPoolExecutor
# 不會立即引發 Exception。
def divide(x, y):
if y == 0:
raise ZeroDivisionError("Cannot divide by zero")
return x / y
# Create worker threads
with ThreadPoolExecutor() as executor:
# Submit tasks
results = executor.map(divide, [40, 30, 20, 10], [2, 2, 0, 2])
# Retrieve results
for result in results:
try:
print(result)
except Exception as e:
print(f"Exception occurred: {e}")
執行結果:
20.0
15.0
[line 15] ZeroDivisionError: Cannot divide by zero
範例 – chunksize(參數)
import time
import concurrent.futures
def measure_execution_time(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs) # function callable
end_time = time.time()
print(f"{func.__name__} executed in {end_time - start_time:.3f} s.")
return result
return wrapper
def square(x):
return x * 2
@measure_execution_time
def work_process(nums, chunksize):
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
results = executor.map(square, nums, chunksize=chunksize)
if __name__ == '__main__':
nums = range(1, 1000) # 1000 tasks
work_process(nums, 1) # 0.374 s
work_process(nums, 2) # 0.227 s
work_process(nums, 4) # 0.172 s
work_process(nums, 7) # 0.150 s
work_process(nums, 10) # 0.140 s
work_process(nums, 15) # 0.134 s
work_process(nums, 20) # 0.144 s
work_process(nums, 30) # 0.137 s
shutdown()
指示不再提交任何任務並且釋放資源。
shutdown(wait=True, *, cancel_futures=False)
wait
:是否應該阻塞,直到 pending futures 完成。 如果 wait=True
,將等待所有提交的任務完成後再返回。 如果 wait=False
,將立即返回和釋放資源,而不等待任務完成。 cancel_futures
:這個參數指定是否取消還沒有開始的 futures。如果cancel_futures=False
,pending futures 將被允許在關閉之前完成。
import time
import concurrent.futures
def task(i):
print(i, 'Finish')
# Create worker threads
executor = concurrent.futures.ThreadPoolExecutor()
future1 = executor.submit(task, 1)
future2 = executor.submit(task, 2)
executor.shutdown()
import time
import concurrent.futures
def task(i):
print(i, 'Finish')
# Create worker threads
with concurrent.futures.ThreadPoolExecutor() as executor:
future1 = executor.submit(task, 1)
future2 = executor.submit(task, 2)
ThreadPoolExecutor
一個 Executor subclass,實現基於 Thread 的 Executor。 通常用於 I/O 密集型任務。
ThreadPoolExecutor(max_workers=None,
thread_name_prefix='',
initializer=None,
initargs=())
max_workers
調用多少個 thread。
如果 max_workers=None
,將預設為 min(32, os.cpu_count()+4)
,並且保證至少max_workers=5
。它最多使用32個 CPU 核心來執行釋放 GIL 的 CPU 綁定任務。
thread_name_prefix
控制 worker thread 的名稱。 這對於調試目的很有用,允許輕鬆識別 ThreadPool 中的 thread。
initializer
可調用物件,在每個 worker thread 開始時調用。如果 initializer
引發 exception,它將導致等待處理的作業引發 BrokenThreadPool
Exception。
initargs
如果你提供了 initializer
,你也可以提供一個 Tuple (initargs
)。initargs
將被當作參數,傳遞給 initializer
函數。
如果在提交新任務時,Pool 中有可用的空閒 worker thread,它不會立即創建新的工作線程,而是首先重用空閒 worker thread 來執行新任務。即在創建 thread 之前,重用空閒 worker thread,來達到 max_workers
上限。
所有排隊的 worker thread 都將自動在直譯器退出之前加入,避免突然終止 thread 並可能留下未完成的工作。ThreadPoolExecutor
的內部退出處理程序優先於調用任何已註冊的退出處理程序(例如:atexit module)。
Python 中 GIL 的存在會影響 ThreadPoolExecutor
。ThreadPoolExecutor
維護一個固定大小的 worker thread pool,支援並 concurrent 任務。但由於 CPython 中的 GIL 阻止多個 thread 同時執行 Python bytecode,意味著任務不會 parallel。
Concurrency:表示任務之間的順序獨立性,它們可以隨時或同時完成。 Parallel:是 concurrency 的一個子集,表示任務是同時執行的。

範例 – thread_name_prefix(參數)
import threading
import concurrent.futures
def task(i):
thread_name = threading.current_thread().name
print(i,"Executing task in thread:", thread_name)
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(
thread_name_prefix="WorkerThread", max_workers=5) as executor:
# Submit tasks
futures = [executor.submit(task, i) for i in range(10)]
執行結果:
0 Executing task in thread: WorkerThread_0
1 Executing task in thread: WorkerThread_0
2 Executing task in thread: WorkerThread_1
3 Executing task in thread: WorkerThread_0
4 Executing task in thread: WorkerThread_2
6 Executing task in thread: WorkerThread_3
7 Executing task in thread: WorkerThread_0
5 Executing task in thread: WorkerThread_1
8 Executing task in thread: WorkerThread_4
9 Executing task in thread: WorkerThread_2
範例 – worker thread
import threading
import concurrent.futures
def task(i):
thread_name = threading.current_thread().name
print(i,"Executing task in thread:", thread_name)
i = 0
for _ in range(1000):
i = i + 1
print('Finish')
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(
thread_name_prefix="WorkerThread", max_workers=3) as executor:
# Submit tasks
futures = [executor.submit(task, i) for i in range(6)]
執行結果:
0 Executing task in thread: WorkerThread_0
Finish
1 Executing task in thread: WorkerThread_1
2 Executing task in thread: WorkerThread_0
Finish
Finish
4 Executing task in thread: WorkerThread_1
5 Executing task in thread: WorkerThread_0
Finish
Finish
3 Executing task in thread: WorkerThread_2
Finish
ProcessPoolExecutor
一個 Executor subclass,實現基於 process 的 Executor,通常用於 CPU 密集型任務。
ProcessPoolExecutor(max_workers=None,
mp_context=None,
initializer=None,
initargs=(),
max_tasks_per_child=None)
max_workers
調用多少個 process。
預設為設備上的邏輯處理器(processors)數量。 在 Windows 上,必須 max_workers <= 61
。
mp_context
使用 multiprocessing context。是一項高級功能,對於大多數用法而言並不是必需的。
initializer
可調用物件,在每個 worker process 開始時調用。如果 initializer
引發 exception,它將導致等待處理的作業引發 BrokenProcessPool
Exception。
initargs
如果你提供了 initializer
,你也可以提供一個 Tuple (initargs
)。initargs
將被當作參數,傳遞給 initializer
函數。
max_tasks_per_child
指定 worker process 在終止和替換之前可以處理的最大任務數量,也就是控制 lifetime(生命週期)。
在沒有 mp_context
參數的情況下,當指定 max_tasks_per_child
時,將使用 spawn
啟動方法。 spawn
方法創建一個新的 Python 直譯器 process ,並啟動其中的 worker process。spawn
方法與 fork
方法不兼容。
ProcessPoolExecutor
Class 使用 multiprocessing
module,使其能夠繞過 [[GIL 【全域直譯器鎖】]]。 但是,必須先對數據進行序列化(pickled)。 因此, ProcessPoolExecutor
只能執行和返回 picklable 物件。
__main__
必須由 worker subprocesses 導入。這意味著 ProcessPoolExecutor
將無法在 interactive 直譯器中工作。此要求確保必要的代碼(函數定義和全域變數)可用於 worker process。也就是必須使用 if __name__ == '__main__':
。
將可調用物件(函數或方法)提交給 ProcessPoolExecutor
執行時,避免從可調用物件中,再次調用任何 Executor
或 Future
方法。 這樣做可能會導致執行停止並且無法取得進展的死鎖情況。
功能比較
功能
ThreadPoolExecutor
ProcessPoolExecutor
Execution Context
共享 memory
memory 相互獨立
Concurrency Model
Concurrency
Parallelism
Resource Utilization
CPU 密集型任務
I/O 密集型任務
Isolation and Safety
如果某個任務遇到異常或崩潰,不會影響其他任務或主要 process。
會影響同一 process 中的其他任務,從而導致更複雜的錯誤處理和調試。
Overhead and Startup Time
高效率
需要更多的 overhead 和啟動時間。啟動新的 process 需要複製整個 process memory。
Shared State and Synchronization
可以直接訪問和修改 process memory 中的共享數據,但必須謹慎對共享資源的訪問,以避免競爭條件。
共享數據需要明確的機制。
範例 – 基礎
import multiprocessing
import concurrent.futures
def task(i):
return i * 2
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(task, i) for i in range(6)]
for future in futures:
print(future.result())
import concurrent.futures
def task(i):
return i * 2
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(task, i) for i in range(6)]
for future in futures:
print(future.result())
if __name__ == '__main__':
main()
範例 – Process vs Thread
import concurrent.futures
import time
# Fibonacci sequence (費波那契數、黃金分割數)
def fib(n):
if n <= 2:
return 1
return fib(n-1) + fib(n-2)
def thread_function():
time_1 = time.time()
# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
# Submit tasks
futures = [executor.submit(fib, i) for i in range(10, 37)]
# Retrieve results
results = [future.result() for future in futures]
time_2 = time.time()
print(f'{time_2 - time_1:.2} s')
print(results)
def process_function():
time_1 = time.time()
# Create a ThreadPoolExecutor
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
# Submit tasks
futures = [executor.submit(fib, i) for i in range(10, 37)]
# Retrieve results
results = [future.result() for future in futures]
time_2 = time.time()
print(f'{time_2 - time_1:.2} s')
print(results)
if __name__ == '__main__':
thread_function()
process_function()
執行結果:
5.3 s
[55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352]
2.3 s
[55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352]
Last updated
Was this helpful?