
Future
Future Class 封裝了可調用非同步執行的物件。 Future 實例由 submit() 建立,任務的回傳值是 Future 的實例,而不是在任務的結果。
result()
用於阻塞(block)和等待 Future 物件的完成,並檢索其結果。
result(timeout=None)
timeout
:等待結果的最大秒數。如果timeout=None
,將無限期阻塞,直到結果可用。如果超過timeout
,則會引發 TimeoutError。
如果Future 物件需要很長的計算時間,阻塞 main threads 或其他關鍵 threads,則可能會導致程序性能和響應能力下降。
範例:
import time
from concurrent.futures import ThreadPoolExecutor
def long_running_task():
print("Starting the long-running task...")
time.sleep(2)
print("Long-running task completed!")
return "Result"
# Create worker threads
with ThreadPoolExecutor() as executor:
# Submit tasks
future = executor.submit(long_running_task)
# Results
result = future.result()
print("Result:", result)
done()
用於檢查 Future 物件是否已經完成執行。它返回一個 bool,指示任務是否已經完成。 如果任務已經完成,返回 True
; 否則返回 False
。
範例:
import concurrent.futures
def some_task(n):
return n ** 2
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks
future = executor.submit(some_task, 5)
# Check task
if future.done():
result = future.result()
print(f"Task result: {result}")
else:
print("Task is still running...")
add_done_callback()
允許您註冊一個回調函數,將會在任務完成時執行。 當您需要執行其他操作或在任務完成後處理任務結果時,此方法特別有用。
考慮 add_done_callback()
潛在的阻塞或耗時操作很重要,因為它們會影響 thread 的性能。 如果您需要執行大量處理或耗時的操作,將這些任務卸載到單獨的 worker threads or processes 可能會更有效。
範例:
import concurrent.futures
def square(x):
return x ** 2
# processing on the result
def process_result(future):
result = future.result()
print(result)
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
task1 = executor.submit(square, 5)
task1.add_done_callback(process_result)
task2 = executor.submit(square, 7)
task2.add_done_callback(process_result)
running()
檢查任務是否正在執行且無法取消。
import time
import concurrent.futures
def task():
time.sleep(1)
print("Task executed.")
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(task)
if future.running(): # Check task
print("Task is currently running.") # ✔️✔️
else:
print("Task is not running.")
import concurrent.futures
import time
def task(index):
print(f"Task {index} executed.")
time.sleep(1.5)
executor = concurrent.futures.ThreadPoolExecutor()
# Submit tasks
futures = []
for i in range(5):
future = executor.submit(task, i)
futures.append(future)
# Monitor running status
while any(f.running() for f in futures):
print("Tasks are still running...")
time.sleep(0.3)
print("All tasks have completed.")
executor.shutdown()
執行結果:
Task 0 executed.
Task 1 executed.
Task 2 executed.
Task 3 executed.
Task 4 executed.
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
All tasks have completed.
cancel()
嘗試取消任務。如果調用當前正在執行的任務或已完成的任務,則該方法將返回 False。
import time
import concurrent.futures
def task():
time.sleep(1)
print("Task executed.")
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(task)
if future.cancel():
print("Task cancelled.")
else:
print("Task could not be cancelled.") # ✔️✔️
cancelled()
檢查任務是否已被取消。
import time
import concurrent.futures
def task():
time.sleep(1)
print("Task executed.")
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(task)
future.cancel()
if future.cancelled(): # Check task
print("Task cancelled.")
else:
print("Task could not be cancelled.") # ✔️✔️
exception()
返回調用任務所引發的 exception。
from concurrent import futures
def task():
raise ValueError("Something went wrong!")
# Create worker threads
with futures.ThreadPoolExecutor(max_workers=5) as executor:
# Submit tasks
future = executor.submit(task)
result = future.exception()
print(result) # Output: Something went wrong!
as_completed()
返回一個 iterator,我們可以在任務完成後,立即處理任務結果。而不是等待所有任務完成,一次性返回。 這在處理大量任務或任務完成順序很重要時,特別有用。
as_completed(fs, timeout=None)
fs
: iterable Future 物件
timeout
: 等待結果的最大秒數。如果timeout=None
,將無限期阻塞,直到結果可用。
範例:
import time
import concurrent.futures
def some_task(n):
time.sleep(1)
return n * 2
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks
futures = [executor.submit(some_task, i) for i in range(5)]
# Use as_completed()
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(f"Task result: {result}")
# No use as_completed()
for future in futures:
result = future.result()
print(f"Task result: {result}")
執行結果:
Task result: 2
Task result: 4
Task result: 0
Task result: 6
Task result: 8
Task result: 0
Task result: 2
Task result: 4
Task result: 6
Task result: 8
wait()
等待 fs
給出的 Future 實例完成。
wait(fs,
timeout=None,
return_when=ALL_COMPLETED)
fs
需要等待的 Future 實例的序列。 可以是一個或多個的 iterable 物件(例如:List, Set)。 這些 Futures 可以由不同的 Executor 實例創建。
timeout
等待結果的最大秒數。如果 timeout=None
,將無限期阻塞,直到 Future 完成。
return_when
指定返回的條件。
"FIRST_COMPLETED"
:當任何的 Future 完成或被取消時,立即將 wait()
返回。
"ALL_COMPLETED"
:當所有的 Future 完成或被取消時,立即將 wait()
返回。
"FIRST_COMPLETED"
:當任何 Future 完成並引發 Exception 時,立即將 wait()
返回。如果沒有引發 Exception,則等同於 "ALL_COMPLETED"
。
範例 – FIRST_COMPLETED
import concurrent.futures
import time
def process_data(data):
result = data
time.sleep(data/3)
return result
# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit tasks
futures = [executor.submit(process_data, i) for i in range(1, 6)]
# Wait futures
done, not_done = concurrent.futures.wait(futures, return_when = 'FIRST_COMPLETED')
# Retrieve results
results = [future.result() for future in done]
print(results)
執行結果:
[1]
範例 – FIRST_EXCEPTION
import concurrent.futures
import time
def process_data(data):
if data == 3:
raise ValueError("Invalid input: 3")
time.sleep(0.3)
return data
# Create ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit tasks
futures = [executor.submit(process_data, i) for i in range(1, 7)]
# Wait futures
done, not_done = concurrent.futures.wait(futures,
return_when = "FIRST_EXCEPTION")
# Retrieve results
results = []
for future in done:
try:
result = future.result()
except Exception as e:
result = f"Error: {type(e).__name__} - {str(e)}"
results.append(result)
print(results)
執行結果:
['Error: ValueError - Invalid input: 3', 1, 2]
參考資料
concurrent.futures — 啟動並行任務 — Python 3.11.4 文檔
multiprocessing — Process-based parallelism — Python 3.11.4 documentation
確定並發和並行的優勢 - 使用 Java 並發擴展您的代碼 - OpenClassrooms
ThreadPoolExecutor 與全局解釋器鎖 (GIL)
concurrent.futures 平行任務處理 - Python 教學 | STEAM 教育學習網
concurrent.futures — 創立非同步任務 — 你所不知道的 Python 標準函式庫用法 06 | louie_lu's blog
Python的GIL是什麼鬼,多線程性能研究究竟如何 • cenalulu's Tech Blog
08、多執行緒 和 多進程 :以斐波拉契亞數列計算為例,只需要切換一下類即可完成從多執行緒到多進程的轉化_鞍-的博客-CSDN博客
Last updated
Was this helpful?