concurrent.futures
該 module 用於 asynchronously(非同步地、異步地)執行程式碼。使用 abstract class 直接管理複雜的 thread 或 process,使您能夠專注於撰寫程式碼,而無需深入了解底層細節。用法上比使用 multiprocessing、threading、asyncio module 容易得多,是相當推薦的 built-in library。它提供了兩個主要的 Class:ThreadPoolExecutor
針對 Thread(執行緒),ProcessPoolExecutor
針對 Process(程序)。
使用 ThreadPoolExecutor
可以認為是 Concurrency,而 ProcessPoolExecutor
則可以當成 Parallelism。
ThreadPoolExecutor
會在同一個 Process 裡面做事。當遇到被阻塞時,會 hang-up 並且由另外一個 Thread 繼續做事。
ProcessPoolExecutor
會建立新的 Process 來處理,當遇到被阻塞時,不會hang-up 並且持續不斷的計算。對於這種狀況,使用 Process 就如同開影分身,能夠讓整體的計算更快完成。
Thread vs Process
Thread :程式執行任務的基本單位。
Process :啟動應用程式時產生的執行實體,需要一定的 CPU 與記憶體資源,Process 由一到多個 Thread 組成,同一個 Process 裡的 Thread 可以共用記憶體資源。
建立 thread 或是 process 都會花費時間成本,這些成本如果沒有辦法平均攤銷到計算裡面的話,就會變成阻力強大 overhead。
相較於 multiprocessing
,concurrent.futures
提供了一組更高階的和更簡單的用法 (例如:submit()
, map()
)。如果不關心非同步執行的細項時 (例如:barrier
,lock
),使用 concurrent.futures
即可。
Executor
提供非同步執行調用方法的 abstract class。
submit()
用於在單獨的 thread 中非同步執行可調用的物件(函數或方法),並返回表示計算結果的 Future 物件。
Copy submit (fn, * args, ** kwargs)
fn
:要非同步執行的可調用的物件。 *args
:傳遞給可調用物件的 positional arguments。 **kwargs
:傳遞給可調用物件的 keyword arguments。
範例 – 基礎 1
Copy from concurrent import futures
def process_data ( data ):
for x in range (data):
print (x)
return data ** 2
# Create worker threads
with futures . ThreadPoolExecutor (max_workers = 5 ) as executor :
# Submit tasks
future1 = executor . submit (process_data, 10 )
future2 = executor . submit (process_data, 11 )
# Wait results
result1 = future1 . result ()
result2 = future2 . result ()
print (result1)
print (result2)
執行結果:
Copy 0
1
2
0
1
2
3
4
5
6
7
8
9
10
11
3
4
5
6
7
8
9
10
121
144
範例 – 基礎 2
Copy import concurrent . futures
import time
def task ( index ):
print ( f "Task { index } executed." )
time . sleep ( 1 )
# Create worker threads
executor = concurrent . futures . ThreadPoolExecutor (max_workers = 5 )
time_1 = time . time ()
# Submit tasks
futures = []
for i in range ( 5 ):
future = executor . submit (task, i)
futures . append (future)
executor . shutdown ()
print ( "All tasks have completed." )
time_2 = time . time ()
print ( f ' { time_2 - time_1 :.4f } s' )
執行結果:
Copy Task 0 executed.
Task 1 executed.
Task 2 executed.
Task 3 executed.
Task 4 executed.
All tasks have completed.
1.0043 s
範例 – 串接 task
Copy import concurrent . futures
def task_a ( data ):
result_a = data * 2
return result_a
def task_b ( data ):
result_b = data * 2
return result_b
def task_c ( data ):
result_c = data * 2
return result_c
# Create worker threads
with concurrent . futures . ThreadPoolExecutor (max_workers = 3 ) as executor :
# Submit tasks
task_a = executor . submit (task_a, 2 )
task_b = executor . submit (task_b, task_a. result ())
task_c = executor . submit (task_c, task_b. result ())
print (task_a. result ())
print (task_b. result ())
print (task_c. result ())
執行結果:
map()
返回一個 iterator,我們可以在任務完成後,立即處理任務結果。而不是等待所有任務完成,一次性返回。 這在處理大量任務或任務完成順序很重要時,特別有用。
Copy map (func,
* iterables,
timeout = None ,
chunksize = 1 )
調用的函數。如果 func
引發 exception 且從 iterator 檢索其值時,將引發該 exception。
將 func
應用於 iterables
的每個項目。
等待結果的最大秒數。如果 timeout=None
,將無限期阻塞,直到結果可用。
當 map()
與 ProcessPoolExecutor
一起使用時,將輸入的 iterables 物件劃分為較小的 chunks(區塊)。
每個 chunk 作為獨立的任務提交,由多個 work process 同時處理。 此參數確定每個 chunk 中,將有多少 iterable 物件在一起。當您有很長的 iterables 物件時,使用較大的 chunksize
可以增強 map()
的性能。(減少Submit tasks 以及主要 process 和 work process 之間通訊的開銷)
as_completed()
和 map()
之間的主要區別在於 tasks 結果的順序。
as_completed()
:按 Submit tasks 完成的順序。
map()
:按 Submit tasks 順序。
範例 – 基礎
Copy import time
import concurrent . futures
def some_task ( n ):
time . sleep ( 1 )
return n * 2
# Create worker threads
with concurrent . futures . ThreadPoolExecutor (max_workers = 3 ) as executor :
# Submit tasks
futures = [executor . submit (some_task, i) for i in range ( 10 ) ]
# Use as_completed()
for future in concurrent . futures . as_completed (futures):
result = future . result ()
print ( f "Task result: { result } " )
# Use map()
inputs = [i for i in range ( 10 ) ]
results = executor . map (some_task, inputs)
for result in results :
print ( f "Task result: { result } " )
執行結果:
Copy Task result: 2
Task result: 4
Task result: 0
Task result: 8
Task result: 10
Task result: 6
Task result: 14
Task result: 16
Task result: 12
Task result: 18
Task result: 0
Task result: 2
Task result: 4
Task result: 6
Task result: 8
Task result: 10
Task result: 12
Task result: 14
Task result: 16
Task result: 18
範例 – Exception
Copy from concurrent . futures import ThreadPoolExecutor
# 不會立即引發 Exception。
def divide ( x , y ):
if y == 0 :
raise ZeroDivisionError ( "Cannot divide by zero" )
return x / y
# Create worker threads
with ThreadPoolExecutor () as executor :
# Submit tasks
results = executor . map (divide, [ 40 , 30 , 20 , 10 ], [ 2 , 2 , 0 , 2 ])
# Retrieve results
for result in results :
try :
print (result)
except Exception as e :
print ( f "Exception occurred: { e } " )
執行結果:
Copy 20.0
15.0
[line 15] ZeroDivisionError: Cannot divide by zero
範例 – chunksize(參數)
Copy import time
import concurrent . futures
def measure_execution_time ( func ):
def wrapper ( * args , ** kwargs ):
start_time = time . time ()
result = func ( * args, ** kwargs) # function callable
end_time = time . time ()
print ( f " { func. __name__} executed in { end_time - start_time :.3f } s." )
return result
return wrapper
def square ( x ):
return x * 2
@measure_execution_time
def work_process ( nums , chunksize ):
with concurrent . futures . ProcessPoolExecutor (max_workers = 10 ) as executor :
results = executor . map (square, nums, chunksize = chunksize)
if __name__ == '__main__' :
nums = range ( 1 , 1000 ) # 1000 tasks
work_process (nums, 1 ) # 0.374 s
work_process (nums, 2 ) # 0.227 s
work_process (nums, 4 ) # 0.172 s
work_process (nums, 7 ) # 0.150 s
work_process (nums, 10 ) # 0.140 s
work_process (nums, 15 ) # 0.134 s
work_process (nums, 20 ) # 0.144 s
work_process (nums, 30 ) # 0.137 s
shutdown()
指示不再提交任何任務並且釋放資源。
Copy shutdown (wait = True , * , cancel_futures = False )
wait
:是否應該阻塞,直到 pending futures 完成。 如果 wait=True
,將等待所有提交的任務完成後再返回。 如果 wait=False
,將立即返回和釋放資源,而不等待任務完成。 cancel_futures
:這個參數指定是否取消還沒有開始的 futures。如果cancel_futures=False
,pending futures 將被允許在關閉之前完成。
使用 with 語句,將會關閉 Executor。等效 shutdown(wait=True, cancel_futures=False)
Copy import time
import concurrent . futures
def task ( i ):
print (i, 'Finish' )
# Create worker threads
executor = concurrent . futures . ThreadPoolExecutor ()
future1 = executor . submit (task, 1 )
future2 = executor . submit (task, 2 )
executor . shutdown ()
Copy import time
import concurrent . futures
def task ( i ):
print (i, 'Finish' )
# Create worker threads
with concurrent . futures . ThreadPoolExecutor () as executor :
future1 = executor . submit (task, 1 )
future2 = executor . submit (task, 2 )
ThreadPoolExecutor
一個 Executor subclass,實現基於 Thread 的 Executor。 通常用於 I/O 密集型任務。
Copy ThreadPoolExecutor (max_workers = None ,
thread_name_prefix = '' ,
initializer = None ,
initargs = ())
調用多少個 thread。
如果 max_workers=None
,將預設為 min(32, os.cpu_count()+4)
,並且保證至少max_workers=5
。它最多使用32個 CPU 核心來執行釋放 GIL 的 CPU 綁定任務。
控制 worker thread 的名稱。 這對於調試目的很有用,允許輕鬆識別 ThreadPool 中的 thread。
可調用物件,在每個 worker thread 開始時調用。如果 initializer
引發 exception,它將導致等待處理的作業引發 BrokenThreadPool
Exception。
如果你提供了 initializer
,你也可以提供一個 Tuple (initargs
)。initargs
將被當作參數,傳遞給 initializer
函數。
ThreadPoolExecutor
通常用於處理涉及 I/O 操作的任務(例如:讀取或寫入文件、進行網絡請求、 interacting 數據庫)。
當任務花費大部分時間在等待 I/O 操作完成,而不是使用 CPU 資源時,ThreadPoolExecutor
特別有用。當一個 Thread 執行的 I/O 操作阻塞時,會釋放GIL,讓另一個Python線程執行。這表明 ThreadPoolExecutor
應該限於那些釋放 GIL 的任務。
ThreadPoolExecutor(worker)
應該要大於 ProcessPoolExecutor(worker)
。
原因是 I/O 綁定任務可以從擁有更多 concurrent threads 中獲得幫助,因為 Thread 的大部分時間都花在等待 I/O 操作完成上。擁有更多的 worker thread有助於在重疊 I/O 操作時,最大限度地提高並發性和效率。
建議不應將 ThreadPoolExecutor
用於長時間運行的任務。
如果主要的 thread 遇到未處理的異常或意外終止,而沒有給 thread 完成的機會,則可能導致不可預測的行為和潛在的資源洩漏。 對於長時間運行的任務,使用 ProcessPoolExecutor
或 managing threads explicitly 等替代方法可能更好地控制任務的執行和終止。
如果在提交新任務時,Pool 中有可用的空閒 worker thread,它不會立即創建新的工作線程,而是首先重用空閒 worker thread 來執行新任務。即在創建 thread 之前,重用空閒 worker thread,來達到 max_workers
上限。
所有排隊的 worker thread 都將自動在直譯器退出之前加入,避免突然終止 thread 並可能留下未完成的工作。ThreadPoolExecutor
的內部退出處理程序優先於調用任何已註冊的退出處理程序(例如:atexit module)。
Python 中 GIL 的存在會影響 ThreadPoolExecutor
。ThreadPoolExecutor
維護一個固定大小的 worker thread pool,支援並 concurrent 任務。但由於 CPython 中的 GIL 阻止多個 thread 同時執行 Python bytecode,意味著任務不會 parallel。
Concurrency:表示任務之間的順序獨立性,它們可以隨時或同時完成。
Parallel:是 concurrency 的一個子集,表示任務是同時執行的。
範例 – thread_name_prefix(參數)
Copy import threading
import concurrent . futures
def task ( i ):
thread_name = threading . current_thread (). name
print (i, "Executing task in thread:" , thread_name)
# Create worker threads
with concurrent . futures . ThreadPoolExecutor (
thread_name_prefix = "WorkerThread" , max_workers = 5 ) as executor :
# Submit tasks
futures = [executor . submit (task, i) for i in range ( 10 ) ]
執行結果:
Copy 0 Executing task in thread: WorkerThread_0
1 Executing task in thread: WorkerThread_0
2 Executing task in thread: WorkerThread_1
3 Executing task in thread: WorkerThread_0
4 Executing task in thread: WorkerThread_2
6 Executing task in thread: WorkerThread_3
7 Executing task in thread: WorkerThread_0
5 Executing task in thread: WorkerThread_1
8 Executing task in thread: WorkerThread_4
9 Executing task in thread: WorkerThread_2
範例 – worker thread
Copy import threading
import concurrent . futures
def task ( i ):
thread_name = threading . current_thread (). name
print (i, "Executing task in thread:" , thread_name)
i = 0
for _ in range ( 1000 ):
i = i + 1
print ( 'Finish' )
# Create worker threads
with concurrent . futures . ThreadPoolExecutor (
thread_name_prefix = "WorkerThread" , max_workers = 3 ) as executor :
# Submit tasks
futures = [executor . submit (task, i) for i in range ( 6 ) ]
執行結果:
Copy 0 Executing task in thread: WorkerThread_0
Finish
1 Executing task in thread: WorkerThread_1
2 Executing task in thread: WorkerThread_0
Finish
Finish
4 Executing task in thread: WorkerThread_1
5 Executing task in thread: WorkerThread_0
Finish
Finish
3 Executing task in thread: WorkerThread_2
Finish
ProcessPoolExecutor
一個 Executor subclass,實現基於 process 的 Executor,通常用於 CPU 密集型任務。
Copy ProcessPoolExecutor (max_workers = None ,
mp_context = None ,
initializer = None ,
initargs = (),
max_tasks_per_child = None )
調用多少個 process。
預設為設備上的邏輯處理器(processors)數量。 在 Windows 上,必須 max_workers <= 61
。
使用 multiprocessing context。是一項高級功能,對於大多數用法而言並不是必需的。
可調用物件,在每個 worker process 開始時調用。如果 initializer
引發 exception,它將導致等待處理的作業引發 BrokenProcessPool
Exception。
如果你提供了 initializer
,你也可以提供一個 Tuple (initargs
)。initargs
將被當作參數,傳遞給 initializer
函數。
指定 worker process 在終止和替換之前可以處理的最大任務數量,也就是控制 lifetime(生命週期)。
在沒有 mp_context
參數的情況下,當指定 max_tasks_per_child
時,將使用 spawn
啟動方法。 spawn
方法創建一個新的 Python 直譯器 process ,並啟動其中的 worker process。spawn
方法與 fork
方法不兼容。
ProcessPoolExecutor
Class 使用 multiprocessing
module,使其能夠繞過 [[GIL 【全域直譯器鎖】]]。 但是,必須先對數據進行序列化(pickled)。 因此, ProcessPoolExecutor
只能執行和返回 picklable 物件。
__main__
必須由 worker subprocesses 導入。這意味著 ProcessPoolExecutor
將無法在 interactive 直譯器中工作。此要求確保必要的代碼(函數定義和全域變數)可用於 worker process。也就是必須使用 if __name__ == '__main__':
。
將可調用物件(函數或方法)提交給 ProcessPoolExecutor
執行時,避免從可調用物件中,再次調用任何 Executor
或 Future
方法。 這樣做可能會導致執行停止並且無法取得進展的死鎖情況。
功能比較
如果某個任務遇到異常或崩潰,不會影響其他任務或主要 process。
會影響同一 process 中的其他任務,從而導致更複雜的錯誤處理和調試。
Overhead and Startup Time
需要更多的 overhead 和啟動時間。啟動新的 process 需要複製整個 process memory。
Shared State and Synchronization
可以直接訪問和修改 process memory 中的共享數據,但必須謹慎對共享資源的訪問,以避免競爭條件。
範例 – 基礎
Copy import multiprocessing
import concurrent . futures
def task ( i ):
return i * 2
if __name__ == '__main__' :
with concurrent . futures . ProcessPoolExecutor (max_workers = 4 ) as executor :
futures = [executor . submit (task, i) for i in range ( 6 ) ]
for future in futures :
print (future. result ())
Copy import concurrent . futures
def task ( i ):
return i * 2
def main ():
with concurrent . futures . ProcessPoolExecutor (max_workers = 4 ) as executor :
futures = [executor . submit (task, i) for i in range ( 6 ) ]
for future in futures :
print (future. result ())
if __name__ == '__main__' :
main ()
範例 – Process vs Thread
Copy import concurrent . futures
import time
# Fibonacci sequence (費波那契數、黃金分割數)
def fib ( n ):
if n <= 2 :
return 1
return fib (n - 1 ) + fib (n - 2 )
def thread_function ():
time_1 = time . time ()
# Create a ThreadPoolExecutor
with concurrent . futures . ThreadPoolExecutor (max_workers = 8 ) as executor :
# Submit tasks
futures = [executor . submit (fib, i) for i in range ( 10 , 37 ) ]
# Retrieve results
results = [future . result () for future in futures]
time_2 = time . time ()
print ( f ' { time_2 - time_1 :.2 } s' )
print (results)
def process_function ():
time_1 = time . time ()
# Create a ThreadPoolExecutor
with concurrent . futures . ProcessPoolExecutor (max_workers = 8 ) as executor :
# Submit tasks
futures = [executor . submit (fib, i) for i in range ( 10 , 37 ) ]
# Retrieve results
results = [future . result () for future in futures]
time_2 = time . time ()
print ( f ' { time_2 - time_1 :.2 } s' )
print (results)
if __name__ == '__main__' :
thread_function ()
process_function ()
執行結果:
Copy 5.3 s
[55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352]
2.3 s
[55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352]