Page cover

Future


Future Class 封裝了可調用非同步執行的物件。 Future 實例由 submit() 建立,任務的回傳值是 Future 的實例,而不是在任務的結果。

result()

用於阻塞(block)和等待 Future 物件的完成,並檢索其結果。

PYTHON
result(timeout=None)

timeout:等待結果的最大秒數。如果timeout=None,將無限期阻塞,直到結果可用。如果超過timeout,則會引發 TimeoutError。

範例:

PYTHON
import time
from concurrent.futures import ThreadPoolExecutor

def long_running_task():
    print("Starting the long-running task...")
    time.sleep(2)
    print("Long-running task completed!")
    return "Result"

# Create worker threads
with ThreadPoolExecutor() as executor:
	# Submit tasks
    future = executor.submit(long_running_task)
    
	# Results
    result = future.result()
    print("Result:", result)

done()


用於檢查 Future 物件是否已經完成執行。它返回一個 bool,指示任務是否已經完成。 如果任務已經完成,返回 True; 否則返回 False

範例:

PYTHON
import concurrent.futures

def some_task(n):
    return n ** 2

# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks
    future = executor.submit(some_task, 5)
    
    # Check task
    if future.done():
        result = future.result()
        print(f"Task result: {result}")
    else:
        print("Task is still running...")

add_done_callback()


允許您註冊一個回調函數,將會在任務完成時執行。 當您需要執行其他操作或在任務完成後處理任務結果時,此方法特別有用。

範例:

PYTHON
import concurrent.futures

def square(x):
    return x ** 2

# processing on the result
def process_result(future):
    result = future.result()
    print(result)

# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    task1 = executor.submit(square, 5)
    task1.add_done_callback(process_result)

    task2 = executor.submit(square, 7)
    task2.add_done_callback(process_result)

running()


檢查任務是否正在執行且無法取消。

PYTHON
import time
import concurrent.futures

def task():
    time.sleep(1)
    print("Task executed.")

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(task)

    if future.running():  # Check task
        print("Task is currently running.") # ✔️✔️
    else:
        print("Task is not running.")
PYTHON
import concurrent.futures
import time

def task(index):
    print(f"Task {index} executed.")
    time.sleep(1.5)

executor = concurrent.futures.ThreadPoolExecutor()

# Submit tasks
futures = []
for i in range(5):
    future = executor.submit(task, i)
    futures.append(future)

# Monitor running status
while any(f.running() for f in futures):
    print("Tasks are still running...")
    time.sleep(0.3)

print("All tasks have completed.")
executor.shutdown()

執行結果:

Task 0 executed.
Task 1 executed.
Task 2 executed.
Task 3 executed.
Task 4 executed.
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
All tasks have completed.

cancel()


嘗試取消任務。如果調用當前正在執行的任務或已完成的任務,則該方法將返回 False。

PYTHON
import time
import concurrent.futures

def task():
    time.sleep(1)
    print("Task executed.")

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(task)

    if future.cancel():
        print("Task cancelled.")
    else:
        print("Task could not be cancelled.") # ✔️✔️

cancelled()


檢查任務是否已被取消。

PYTHON
import time
import concurrent.futures

def task():
    time.sleep(1)
    print("Task executed.")

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(task)

    future.cancel()
    if future.cancelled():  # Check task
        print("Task cancelled.")
    else:
        print("Task could not be cancelled.") # ✔️✔️

exception()


返回調用任務所引發的 exception。

PYTHON
from concurrent import futures

def task():
    raise ValueError("Something went wrong!")

# Create worker threads
with futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Submit tasks
    future = executor.submit(task)
    
    result = future.exception()
    print(result)  # Output: Something went wrong!

as_completed()


返回一個 iterator,我們可以在任務完成後,立即處理任務結果。而不是等待所有任務完成,一次性返回。 這在處理大量任務或任務完成順序很重要時,特別有用。

PYTHON
as_completed(fs, timeout=None)

fs: iterable Future 物件

timeout: 等待結果的最大秒數。如果timeout=None,將無限期阻塞,直到結果可用。

as_completed()map() 之間的主要區別在於 tasks 結果的順序。

as_completed():按 Submit tasks 完成的順序。 map():按 Submit tasks 順序。

範例:

import time
import concurrent.futures

def some_task(n):
    time.sleep(1)
    return n * 2

# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks
    futures = [executor.submit(some_task, i) for i in range(5)]
    
    # Use as_completed()
    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        print(f"Task result: {result}")

    # No use as_completed()
    for future in futures:
        result = future.result()
        print(f"Task result: {result}")

執行結果:

Task result: 2
Task result: 4
Task result: 0
Task result: 6
Task result: 8

Task result: 0
Task result: 2
Task result: 4
Task result: 6
Task result: 8

wait()


等待 fs 給出的 Future 實例完成。

PYTHON
wait(fs, 
     timeout=None, 
     return_when=ALL_COMPLETED)
參數
說明

fs

需要等待的 Future 實例的序列。 可以是一個或多個的 iterable 物件(例如:List, Set)。 這些 Futures 可以由不同的 Executor 實例創建。

timeout

等待結果的最大秒數。如果 timeout=None,將無限期阻塞,直到 Future 完成。

return_when

指定返回的條件。 "FIRST_COMPLETED":當任何的 Future 完成或被取消時,立即將 wait() 返回。 "ALL_COMPLETED":當所有的 Future 完成或被取消時,立即將 wait() 返回。 "FIRST_COMPLETED":當任何 Future 完成並引發 Exception 時,立即將 wait() 返回。如果沒有引發 Exception,則等同於 "ALL_COMPLETED"

範例 – FIRST_COMPLETED


PYTHON
import concurrent.futures
import time
def process_data(data):
    result = data
    time.sleep(data/3)
    return result

# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Submit tasks
    futures = [executor.submit(process_data, i) for i in range(1, 6)]

    # Wait futures
    done, not_done = concurrent.futures.wait(futures, return_when = 'FIRST_COMPLETED')

    # Retrieve results
    results = [future.result() for future in done]

print(results)

執行結果:

[1]

範例 – FIRST_EXCEPTION


PYTHON
import concurrent.futures
import time

def process_data(data):
    if data == 3:
        raise ValueError("Invalid input: 3")
    time.sleep(0.3)
    return data

# Create ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Submit tasks
    futures = [executor.submit(process_data, i) for i in range(1, 7)]

    # Wait futures
    done, not_done = concurrent.futures.wait(futures, 
                                             return_when = "FIRST_EXCEPTION")

    # Retrieve results
    results = []
    for future in done:
        try:
            result = future.result()
        except Exception as e:
            result = f"Error: {type(e).__name__} - {str(e)}"
        results.append(result)

print(results)

執行結果:

['Error: ValueError - Invalid input: 3', 1, 2]

參考資料


concurrent.futures — 啟動並行任務 — Python 3.11.4 文檔

multiprocessing — Process-based parallelism — Python 3.11.4 documentation

stoensin map – github gist

確定並發和並行的優勢 - 使用 Java 並發擴展您的代碼 - OpenClassrooms

ThreadPoolExecutor 與全局解釋器鎖 (GIL)

concurrent.futures 平行任務處理 - Python 教學 | STEAM 教育學習網

concurrent.futures — 創立非同步任務 — 你所不知道的 Python 標準函式庫用法 06 | louie_lu's blog

Python的GIL是什麼鬼,多線程性能研究究竟如何 • cenalulu's Tech Blog

並發與並行:有什麼區別? | 登錄半徑博客

08、多執行緒 和 多進程 :以斐波拉契亞數列計算為例,只需要切換一下類即可完成從多執行緒到多進程的轉化_鞍-的博客-CSDN博客

Python 好用模組教學 - concurrent.futures - MyApollo

並發執行 — Python 3.11.4 文檔

Last updated

Was this helpful?