XiWind 西風之劍
HomeTechProContactGitHub
  • About
  • Git
    • Windows Terminal、PowerShell 安裝
    • Git 開始使用
    • Branch 入門
    • 合併多個 Commit , 編輯
    • 額外功能
  • deep learning
    • Dilated Convolution
  • Python
    • GIL 【全域直譯器鎖】
    • PyPy 【JIT 編譯器】
    • Decorator 【修飾器】
      • Class Decorators
  • Python library
    • abc 【抽象 Class】
      • ABC, ABCMeta
      • __abstractmethods__, get_cache_token, update_abstractmethods
    • dataclasses 【數據 Class】
      • make_dataclass(), replace(), is_dataclass(), __post_init__
    • enum 【列舉 Class】
      • Flag, auto(), unique, verify()
      • 範例
    • concurrent.futures 【執行緒、程序】
      • Future, Module Functions
    • queue 【佇列、同步】
      • full(), empty(), qsize(), join(), task_done()
    • functools 【可調用物件】
      • ordering、wrapper、partial
      • Overloading
    • heapq 【堆積佇列】
      • heapify(), merge(), nlargest(), nsmallest()
    • time 【時間】
      • time(), monotonic(), perf_counter()...
      • sleep(), 範例...
    • logging 【日誌】
Powered by GitBook
On this page
  • Future
  • result()
  • done()
  • add_done_callback()
  • running()
  • cancel()
  • cancelled()
  • exception()
  • as_completed()
  • wait()
  • 範例 – FIRST_COMPLETED
  • 範例 – FIRST_EXCEPTION
  • 參考資料

Was this helpful?

  1. Python library
  2. concurrent.futures 【執行緒、程序】
Previousconcurrent.futures 【執行緒、程序】Nextqueue 【佇列、同步】

Last updated 1 year ago

Was this helpful?

Future


Future Class 封裝了可調用非同步執行的物件。 Future 實例由 建立,任務的回傳值是 Future 的實例,而不是在任務的結果。

result()

用於阻塞(block)和等待 Future 物件的完成,並檢索其結果。

PYTHON
result(timeout=None)

timeout:等待結果的最大秒數。如果timeout=None,將無限期阻塞,直到結果可用。如果超過timeout,則會引發 TimeoutError。

如果Future 物件需要很長的計算時間,阻塞 main threads 或其他關鍵 threads,則可能會導致程序性能和響應能力下降。

範例:

PYTHON
import time
from concurrent.futures import ThreadPoolExecutor

def long_running_task():
    print("Starting the long-running task...")
    time.sleep(2)
    print("Long-running task completed!")
    return "Result"

# Create worker threads
with ThreadPoolExecutor() as executor:
	# Submit tasks
    future = executor.submit(long_running_task)
    
	# Results
    result = future.result()
    print("Result:", result)

done()


用於檢查 Future 物件是否已經完成執行。它返回一個 bool,指示任務是否已經完成。 如果任務已經完成,返回 True; 否則返回 False。

範例:

PYTHON
import concurrent.futures

def some_task(n):
    return n ** 2

# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks
    future = executor.submit(some_task, 5)
    
    # Check task
    if future.done():
        result = future.result()
        print(f"Task result: {result}")
    else:
        print("Task is still running...")

add_done_callback()


允許您註冊一個回調函數,將會在任務完成時執行。 當您需要執行其他操作或在任務完成後處理任務結果時,此方法特別有用。

考慮 add_done_callback() 潛在的阻塞或耗時操作很重要,因為它們會影響 thread 的性能。 如果您需要執行大量處理或耗時的操作,將這些任務卸載到單獨的 worker threads or processes 可能會更有效。

範例:

PYTHON
import concurrent.futures

def square(x):
    return x ** 2

# processing on the result
def process_result(future):
    result = future.result()
    print(result)

# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    task1 = executor.submit(square, 5)
    task1.add_done_callback(process_result)

    task2 = executor.submit(square, 7)
    task2.add_done_callback(process_result)

running()


檢查任務是否正在執行且無法取消。

PYTHON
import time
import concurrent.futures

def task():
    time.sleep(1)
    print("Task executed.")

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(task)

    if future.running():  # Check task
        print("Task is currently running.") # ✔️✔️
    else:
        print("Task is not running.")
PYTHON
import concurrent.futures
import time

def task(index):
    print(f"Task {index} executed.")
    time.sleep(1.5)

executor = concurrent.futures.ThreadPoolExecutor()

# Submit tasks
futures = []
for i in range(5):
    future = executor.submit(task, i)
    futures.append(future)

# Monitor running status
while any(f.running() for f in futures):
    print("Tasks are still running...")
    time.sleep(0.3)

print("All tasks have completed.")
executor.shutdown()

執行結果:

Task 0 executed.
Task 1 executed.
Task 2 executed.
Task 3 executed.
Task 4 executed.
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
All tasks have completed.

cancel()


嘗試取消任務。如果調用當前正在執行的任務或已完成的任務,則該方法將返回 False。

PYTHON
import time
import concurrent.futures

def task():
    time.sleep(1)
    print("Task executed.")

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(task)

    if future.cancel():
        print("Task cancelled.")
    else:
        print("Task could not be cancelled.") # ✔️✔️

cancelled()


檢查任務是否已被取消。

PYTHON
import time
import concurrent.futures

def task():
    time.sleep(1)
    print("Task executed.")

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(task)

    future.cancel()
    if future.cancelled():  # Check task
        print("Task cancelled.")
    else:
        print("Task could not be cancelled.") # ✔️✔️

exception()


返回調用任務所引發的 exception。

PYTHON
from concurrent import futures

def task():
    raise ValueError("Something went wrong!")

# Create worker threads
with futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Submit tasks
    future = executor.submit(task)
    
    result = future.exception()
    print(result)  # Output: Something went wrong!

as_completed()


返回一個 iterator,我們可以在任務完成後,立即處理任務結果。而不是等待所有任務完成,一次性返回。 這在處理大量任務或任務完成順序很重要時,特別有用。

PYTHON
as_completed(fs, timeout=None)

fs: iterable Future 物件

timeout: 等待結果的最大秒數。如果timeout=None,將無限期阻塞,直到結果可用。

as_completed() 和 map() 之間的主要區別在於 tasks 結果的順序。

as_completed():按 Submit tasks 完成的順序。 map():按 Submit tasks 順序。

範例:

import time
import concurrent.futures

def some_task(n):
    time.sleep(1)
    return n * 2

# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks
    futures = [executor.submit(some_task, i) for i in range(5)]
    
    # Use as_completed()
    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        print(f"Task result: {result}")

    # No use as_completed()
    for future in futures:
        result = future.result()
        print(f"Task result: {result}")

執行結果:

Task result: 2
Task result: 4
Task result: 0
Task result: 6
Task result: 8

Task result: 0
Task result: 2
Task result: 4
Task result: 6
Task result: 8

wait()


等待 fs 給出的 Future 實例完成。

PYTHON
wait(fs, 
     timeout=None, 
     return_when=ALL_COMPLETED)
參數
說明

fs

需要等待的 Future 實例的序列。 可以是一個或多個的 iterable 物件(例如:List, Set)。 這些 Futures 可以由不同的 Executor 實例創建。

timeout

等待結果的最大秒數。如果 timeout=None,將無限期阻塞,直到 Future 完成。

return_when

指定返回的條件。 "FIRST_COMPLETED":當任何的 Future 完成或被取消時,立即將 wait() 返回。 "ALL_COMPLETED":當所有的 Future 完成或被取消時,立即將 wait() 返回。 "FIRST_COMPLETED":當任何 Future 完成並引發 Exception 時,立即將 wait() 返回。如果沒有引發 Exception,則等同於 "ALL_COMPLETED"。

範例 – FIRST_COMPLETED


PYTHON
import concurrent.futures
import time
def process_data(data):
    result = data
    time.sleep(data/3)
    return result

# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Submit tasks
    futures = [executor.submit(process_data, i) for i in range(1, 6)]

    # Wait futures
    done, not_done = concurrent.futures.wait(futures, return_when = 'FIRST_COMPLETED')

    # Retrieve results
    results = [future.result() for future in done]

print(results)

執行結果:

[1]

範例 – FIRST_EXCEPTION


PYTHON
import concurrent.futures
import time

def process_data(data):
    if data == 3:
        raise ValueError("Invalid input: 3")
    time.sleep(0.3)
    return data

# Create ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Submit tasks
    futures = [executor.submit(process_data, i) for i in range(1, 7)]

    # Wait futures
    done, not_done = concurrent.futures.wait(futures, 
                                             return_when = "FIRST_EXCEPTION")

    # Retrieve results
    results = []
    for future in done:
        try:
            result = future.result()
        except Exception as e:
            result = f"Error: {type(e).__name__} - {str(e)}"
        results.append(result)

print(results)

執行結果:

['Error: ValueError - Invalid input: 3', 1, 2]

參考資料


concurrent.futures — 啟動並行任務 — Python 3.11.4 文檔
multiprocessing — Process-based parallelism — Python 3.11.4 documentation
stoensin map – github gist
確定並發和並行的優勢 - 使用 Java 並發擴展您的代碼 - OpenClassrooms
ThreadPoolExecutor 與全局解釋器鎖 (GIL)
concurrent.futures 平行任務處理 - Python 教學 | STEAM 教育學習網
concurrent.futures — 創立非同步任務 — 你所不知道的 Python 標準函式庫用法 06 | louie_lu's blog
Python的GIL是什麼鬼,多線程性能研究究竟如何 • cenalulu's Tech Blog
並發與並行:有什麼區別? | 登錄半徑博客
08、多執行緒 和 多進程 :以斐波拉契亞數列計算為例,只需要切換一下類即可完成從多執行緒到多進程的轉化_鞍-的博客-CSDN博客
Python 好用模組教學 - concurrent.futures - MyApollo
並發執行 — Python 3.11.4 文檔
submit()
Page cover image