XiWind 西風之劍
HomeTechProContactGitHub
  • About
  • Git
    • Windows Terminal、PowerShell 安裝
    • Git 開始使用
    • Branch 入門
    • 合併多個 Commit , 編輯
    • 額外功能
  • deep learning
    • Dilated Convolution
  • Python
    • GIL 【全域直譯器鎖】
    • PyPy 【JIT 編譯器】
    • Decorator 【修飾器】
      • Class Decorators
  • Python library
    • abc 【抽象 Class】
      • ABC, ABCMeta
      • __abstractmethods__, get_cache_token, update_abstractmethods
    • dataclasses 【數據 Class】
      • make_dataclass(), replace(), is_dataclass(), __post_init__
    • enum 【列舉 Class】
      • Flag, auto(), unique, verify()
      • 範例
    • concurrent.futures 【執行緒、程序】
      • Future, Module Functions
    • queue 【佇列、同步】
      • full(), empty(), qsize(), join(), task_done()
    • functools 【可調用物件】
      • ordering、wrapper、partial
      • Overloading
    • heapq 【堆積佇列】
      • heapify(), merge(), nlargest(), nsmallest()
    • time 【時間】
      • time(), monotonic(), perf_counter()...
      • sleep(), 範例...
    • logging 【日誌】
Powered by GitBook
On this page
  • concurrent.futures
  • Executor
  • submit()
  • map()
  • shutdown()
  • ThreadPoolExecutor
  • 範例 – thread_name_prefix(參數)
  • 範例 – worker thread
  • ProcessPoolExecutor
  • 功能比較
  • 範例 – 基礎
  • 範例 – Process vs Thread

Was this helpful?

  1. Python library

concurrent.futures 【執行緒、程序】

Previous範例NextFuture, Module Functions

Last updated 1 year ago

Was this helpful?

concurrent.futures


該 module 用於 asynchronously(非同步地、異步地)執行程式碼。使用 abstract class 直接管理複雜的 thread 或 process,使您能夠專注於撰寫程式碼,而無需深入了解底層細節。用法上比使用 multiprocessing、threading、asyncio module 容易得多,是相當推薦的 built-in library。它提供了兩個主要的 Class:ThreadPoolExecutor 針對 Thread(執行緒),ProcessPoolExecutor 針對 Process(程序)。

使用 ThreadPoolExecutor 可以認為是 Concurrency,而 ProcessPoolExecutor 則可以當成 Parallelism。

ThreadPoolExecutor 會在同一個 Process 裡面做事。當遇到被阻塞時,會 hang-up 並且由另外一個 Thread 繼續做事。

ProcessPoolExecutor 會建立新的 Process 來處理,當遇到被阻塞時,不會hang-up 並且持續不斷的計算。對於這種狀況,使用 Process 就如同開影分身,能夠讓整體的計算更快完成。

Thread vs Process

Thread :程式執行任務的基本單位。

Process:啟動應用程式時產生的執行實體,需要一定的 CPU 與記憶體資源,Process 由一到多個 Thread 組成,同一個 Process 裡的 Thread 可以共用記憶體資源。

建立 thread 或是 process 都會花費時間成本,這些成本如果沒有辦法平均攤銷到計算裡面的話,就會變成阻力強大 overhead。

相較於 multiprocessing,concurrent.futures 提供了一組更高階的和更簡單的用法 (例如:submit(), map())。如果不關心非同步執行的細項時 (例如:barrier,lock ),使用 concurrent.futures 即可。

Executor


提供非同步執行調用方法的 abstract class。

submit()

用於在單獨的 thread 中非同步執行可調用的物件(函數或方法),並返回表示計算結果的 Future 物件。

PYTHON
submit(fn, *args, **kwargs)

fn:要非同步執行的可調用的物件。 *args:傳遞給可調用物件的 positional arguments。 **kwargs:傳遞給可調用物件的 keyword arguments。

範例 – 基礎 1

PYTHON
from concurrent import futures

def process_data(data):
    for x in range(data):
        print(x)
    return data ** 2

# Create worker threads
with futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Submit tasks
    future1 = executor.submit(process_data, 10)
    future2 = executor.submit(process_data, 11)

    # Wait results
    result1 = future1.result()
    result2 = future2.result()
    
    print(result1)
    print(result2)

執行結果:

0
1
2
0
1
2
3
4
5
6
7
8
9
10
11
3
4
5
6
7
8
9
10
121
144

範例 – 基礎 2

PYTHON
import concurrent.futures
import time

def task(index):
    print(f"Task {index} executed.")
    time.sleep(1)

# Create worker threads
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)

time_1 = time.time()

# Submit tasks
futures = []
for i in range(5):
    future = executor.submit(task, i)
    futures.append(future)

executor.shutdown()
print("All tasks have completed.")

time_2 = time.time()
print(f'{time_2 - time_1:.4f} s')

執行結果:

Task 0 executed.
Task 1 executed.
Task 2 executed.
Task 3 executed.
Task 4 executed.
All tasks have completed.
1.0043 s

範例 – 串接 task

PYTHON
import concurrent.futures

def task_a(data):
    result_a = data * 2
    return result_a

def task_b(data):
    result_b = data * 2
    return result_b

def task_c(data):
    result_c = data * 2
    return result_c

# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks
    task_a = executor.submit(task_a, 2)
    task_b = executor.submit(task_b, task_a.result())
    task_c = executor.submit(task_c, task_b.result())

    print(task_a.result())
    print(task_b.result())
    print(task_c.result())

執行結果:

4
8
16

map()


返回一個 iterator,我們可以在任務完成後,立即處理任務結果。而不是等待所有任務完成,一次性返回。 這在處理大量任務或任務完成順序很重要時,特別有用。

PYTHON
map(func, 
    *iterables, 
    timeout=None, 
    chunksize=1)
參數
說明

func

調用的函數。如果 func 引發 exception 且從 iterator 檢索其值時,將引發該 exception。

*iterables

將 func 應用於 iterables 的每個項目。

timeout

等待結果的最大秒數。如果 timeout=None,將無限期阻塞,直到結果可用。

chunksize

當 map() 與 ProcessPoolExecutor 一起使用時,將輸入的 iterables 物件劃分為較小的 chunks(區塊)。 每個 chunk 作為獨立的任務提交,由多個 work process 同時處理。 此參數確定每個 chunk 中,將有多少 iterable 物件在一起。當您有很長的 iterables 物件時,使用較大的 chunksize 可以增強 map() 的性能。(減少Submit tasks 以及主要 process 和 work process 之間通訊的開銷)

as_completed() 和 map() 之間的主要區別在於 tasks 結果的順序。

as_completed():按 Submit tasks 完成的順序。 map():按 Submit tasks 順序。

範例 – 基礎

PYTHON
import time
import concurrent.futures

def some_task(n):
    time.sleep(1)
    return n * 2

# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks
    futures = [executor.submit(some_task, i) for i in range(10)]
    
    # Use as_completed()
    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        print(f"Task result: {result}")
        
    # Use map()
    inputs = [i for i in range(10)]
    results = executor.map(some_task, inputs)
    for result in results:
        print(f"Task result: {result}")

執行結果:

Task result: 2
Task result: 4
Task result: 0
Task result: 8
Task result: 10
Task result: 6
Task result: 14
Task result: 16
Task result: 12
Task result: 18

Task result: 0
Task result: 2
Task result: 4
Task result: 6
Task result: 8
Task result: 10
Task result: 12
Task result: 14
Task result: 16
Task result: 18

範例 – Exception

PYTHON
from concurrent.futures import ThreadPoolExecutor

# 不會立即引發 Exception。
def divide(x, y):
    if y == 0:
        raise ZeroDivisionError("Cannot divide by zero")
    return x / y

# Create worker threads
with ThreadPoolExecutor() as executor:
    # Submit tasks
    results = executor.map(divide, [40, 30, 20, 10], [2, 2, 0, 2])

    # Retrieve results
    for result in results:
        try:
            print(result)
        except Exception as e:
            print(f"Exception occurred: {e}")

執行結果:

20.0
15.0
[line 15] ZeroDivisionError: Cannot divide by zero

範例 – chunksize(參數)

PYTHON
import time
import concurrent.futures

def measure_execution_time(func):
	def wrapper(*args, **kwargs):
		start_time = time.time()
		result = func(*args, **kwargs) # function callable
		end_time = time.time()
		print(f"{func.__name__} executed in {end_time - start_time:.3f} s.")
		return result
	return wrapper

def square(x):
	return x * 2

@measure_execution_time
def work_process(nums, chunksize):
	with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
		results = executor.map(square, nums, chunksize=chunksize)

if __name__ == '__main__':
	nums = range(1, 1000)  # 1000 tasks

	work_process(nums, 1)  # 0.374 s
	work_process(nums, 2)  # 0.227 s
	work_process(nums, 4)  # 0.172 s
	work_process(nums, 7)  # 0.150 s
	work_process(nums, 10) # 0.140 s
	work_process(nums, 15) # 0.134 s
	work_process(nums, 20) # 0.144 s
	work_process(nums, 30) # 0.137 s

shutdown()


指示不再提交任何任務並且釋放資源。

PYTHON
shutdown(wait=True, *, cancel_futures=False)

wait:是否應該阻塞,直到 pending futures 完成。 如果 wait=True ,將等待所有提交的任務完成後再返回。 如果 wait=False,將立即返回和釋放資源,而不等待任務完成。 cancel_futures:這個參數指定是否取消還沒有開始的 futures。如果cancel_futures=False,pending futures 將被允許在關閉之前完成。

使用 with 語句,將會關閉 Executor。等效 shutdown(wait=True, cancel_futures=False)

PYTHON
import time
import concurrent.futures

def task(i):
    print(i, 'Finish')
    
# Create worker threads
executor = concurrent.futures.ThreadPoolExecutor()
future1 = executor.submit(task, 1)
future2 = executor.submit(task, 2)

executor.shutdown()
PYTHON
import time
import concurrent.futures

def task(i):
    print(i, 'Finish')
    
# Create worker threads
with concurrent.futures.ThreadPoolExecutor() as executor:
    future1 = executor.submit(task, 1)
    future2 = executor.submit(task, 2)

ThreadPoolExecutor


一個 Executor subclass,實現基於 Thread 的 Executor。 通常用於 I/O 密集型任務。

PYTHON
ThreadPoolExecutor(max_workers=None, 
                   thread_name_prefix='', 
                   initializer=None, 
                   initargs=())
參數
說明

max_workers

調用多少個 thread。 如果 max_workers=None ,將預設為 min(32, os.cpu_count()+4),並且保證至少max_workers=5。它最多使用32個 CPU 核心來執行釋放 GIL 的 CPU 綁定任務。

thread_name_prefix

控制 worker thread 的名稱。 這對於調試目的很有用,允許輕鬆識別 ThreadPool 中的 thread。

initializer

可調用物件,在每個 worker thread 開始時調用。如果 initializer 引發 exception,它將導致等待處理的作業引發 BrokenThreadPool Exception。

initargs

如果你提供了 initializer,你也可以提供一個 Tuple (initargs)。initargs 將被當作參數,傳遞給 initializer 函數。

ThreadPoolExecutor 通常用於處理涉及 I/O 操作的任務(例如:讀取或寫入文件、進行網絡請求、 interacting 數據庫)。

當任務花費大部分時間在等待 I/O 操作完成,而不是使用 CPU 資源時,ThreadPoolExecutor 特別有用。當一個 Thread 執行的 I/O 操作阻塞時,會釋放GIL,讓另一個Python線程執行。這表明 ThreadPoolExecutor 應該限於那些釋放 GIL 的任務。

ThreadPoolExecutor(worker) 應該要大於 ProcessPoolExecutor(worker)。

原因是 I/O 綁定任務可以從擁有更多 concurrent threads 中獲得幫助,因為 Thread 的大部分時間都花在等待 I/O 操作完成上。擁有更多的 worker thread有助於在重疊 I/O 操作時,最大限度地提高並發性和效率。

建議不應將 ThreadPoolExecutor 用於長時間運行的任務。

如果主要的 thread 遇到未處理的異常或意外終止,而沒有給 thread 完成的機會,則可能導致不可預測的行為和潛在的資源洩漏。 對於長時間運行的任務,使用 ProcessPoolExecutor 或 managing threads explicitly 等替代方法可能更好地控制任務的執行和終止。

如果在提交新任務時,Pool 中有可用的空閒 worker thread,它不會立即創建新的工作線程,而是首先重用空閒 worker thread 來執行新任務。即在創建 thread 之前,重用空閒 worker thread,來達到 max_workers 上限。

所有排隊的 worker thread 都將自動在直譯器退出之前加入,避免突然終止 thread 並可能留下未完成的工作。ThreadPoolExecutor 的內部退出處理程序優先於調用任何已註冊的退出處理程序(例如:atexit module)。

Python 中 GIL 的存在會影響 ThreadPoolExecutor。ThreadPoolExecutor 維護一個固定大小的 worker thread pool,支援並 concurrent 任務。但由於 CPython 中的 GIL 阻止多個 thread 同時執行 Python bytecode,意味著任務不會 parallel。

Concurrency:表示任務之間的順序獨立性,它們可以隨時或同時完成。 Parallel:是 concurrency 的一個子集,表示任務是同時執行的。

範例 – thread_name_prefix(參數)

PYTHON
import threading
import concurrent.futures

def task(i):
    thread_name = threading.current_thread().name
    print(i,"Executing task in thread:", thread_name)

# Create worker threads
with concurrent.futures.ThreadPoolExecutor(
    thread_name_prefix="WorkerThread", max_workers=5) as executor:
    # Submit tasks
    futures = [executor.submit(task, i) for i in range(10)]

執行結果:

0 Executing task in thread: WorkerThread_0
1 Executing task in thread: WorkerThread_0
2 Executing task in thread: WorkerThread_1
3 Executing task in thread: WorkerThread_0
4 Executing task in thread: WorkerThread_2
6 Executing task in thread: WorkerThread_3
7 Executing task in thread: WorkerThread_0
5 Executing task in thread: WorkerThread_1
8 Executing task in thread: WorkerThread_4
9 Executing task in thread: WorkerThread_2

範例 – worker thread

PYTHON
import threading
import concurrent.futures

def task(i):
    thread_name = threading.current_thread().name
    print(i,"Executing task in thread:", thread_name)
    i = 0
    for _ in range(1000):
        i = i + 1
    print('Finish')
    
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(
    thread_name_prefix="WorkerThread", max_workers=3) as executor:
    # Submit tasks
    futures = [executor.submit(task, i) for i in range(6)]

執行結果:

0 Executing task in thread: WorkerThread_0
Finish
1 Executing task in thread: WorkerThread_1
2 Executing task in thread: WorkerThread_0
Finish
Finish
4 Executing task in thread: WorkerThread_1
5 Executing task in thread: WorkerThread_0
Finish
Finish
3 Executing task in thread: WorkerThread_2
Finish

ProcessPoolExecutor


一個 Executor subclass,實現基於 process 的 Executor,通常用於 CPU 密集型任務。

PYTHON
ProcessPoolExecutor(max_workers=None, 
                    mp_context=None, 
                    initializer=None, 
                    initargs=(), 
                    max_tasks_per_child=None)
參數
說明

max_workers

調用多少個 process。 預設為設備上的邏輯處理器(processors)數量。 在 Windows 上,必須 max_workers <= 61。

mp_context

使用 multiprocessing context。是一項高級功能,對於大多數用法而言並不是必需的。

initializer

可調用物件,在每個 worker process 開始時調用。如果 initializer 引發 exception,它將導致等待處理的作業引發 BrokenProcessPool Exception。

initargs

如果你提供了 initializer,你也可以提供一個 Tuple (initargs)。initargs 將被當作參數,傳遞給 initializer 函數。

max_tasks_per_child

指定 worker process 在終止和替換之前可以處理的最大任務數量,也就是控制 lifetime(生命週期)。 在沒有 mp_context 參數的情況下,當指定 max_tasks_per_child 時,將使用 spawn 啟動方法。 spawn 方法創建一個新的 Python 直譯器 process ,並啟動其中的 worker process。spawn 方法與 fork 方法不兼容。

ProcessPoolExecutor Class 使用 multiprocessing module,使其能夠繞過 [[GIL 【全域直譯器鎖】]]。 但是,必須先對數據進行序列化(pickled)。 因此, ProcessPoolExecutor 只能執行和返回 picklable 物件。

__main__ 必須由 worker subprocesses 導入。這意味著 ProcessPoolExecutor 將無法在 interactive 直譯器中工作。此要求確保必要的代碼(函數定義和全域變數)可用於 worker process。也就是必須使用 if __name__ == '__main__': 。

將可調用物件(函數或方法)提交給 ProcessPoolExecutor 執行時,避免從可調用物件中,再次調用任何 Executor 或 Future 方法。 這樣做可能會導致執行停止並且無法取得進展的死鎖情況。

功能比較


功能

ThreadPoolExecutor

ProcessPoolExecutor

Execution Context

共享 memory

memory 相互獨立

Concurrency Model

Concurrency

Parallelism

Resource Utilization

CPU 密集型任務

I/O 密集型任務

Isolation and Safety

如果某個任務遇到異常或崩潰,不會影響其他任務或主要 process。

會影響同一 process 中的其他任務,從而導致更複雜的錯誤處理和調試。

Overhead and Startup Time

高效率

需要更多的 overhead 和啟動時間。啟動新的 process 需要複製整個 process memory。

Shared State and Synchronization

可以直接訪問和修改 process memory 中的共享數據,但必須謹慎對共享資源的訪問,以避免競爭條件。

共享數據需要明確的機制。

範例 – 基礎


PYTHON
import multiprocessing
import concurrent.futures

def task(i):
    return i * 2

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(task, i) for i in range(6)]
        for future in futures:
            print(future.result())
PYTHON
import concurrent.futures

def task(i):
    return i * 2

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(task, i) for i in range(6)]
        for future in futures:
            print(future.result())

if __name__ == '__main__':
    main()

範例 – Process vs Thread


PYTHON
import concurrent.futures
import time

# Fibonacci sequence (費波那契數、黃金分割數)
def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)

def thread_function():
    time_1 = time.time()
    # Create a ThreadPoolExecutor
    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
        # Submit tasks
        futures = [executor.submit(fib, i) for i in range(10, 37)]

        # Retrieve results
        results = [future.result() for future in futures]
    time_2 = time.time()
    print(f'{time_2 - time_1:.2} s')
    print(results)

def process_function():
    time_1 = time.time()
    # Create a ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
        # Submit tasks
        futures = [executor.submit(fib, i) for i in range(10, 37)]

        # Retrieve results
        results = [future.result() for future in futures]
    time_2 = time.time()
    print(f'{time_2 - time_1:.2} s')
    print(results)


if __name__ == '__main__':
    thread_function()
    process_function()

執行結果:

5.3 s
[55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352]
2.3 s
[55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352]

concurrent.futures module
Concurrency
Parallelism
Concurrency
Parallelism
Parallel vs Concurrency
concurrent.futures module
Concurrency
Parallelism
Concurrency
Parallelism
Parallel vs Concurrency
Page cover image