# concurrent.futures 【執行緒、程序】

## concurrent.futures

***

該 module 用於 asynchronously(非同步地、異步地)執行程式碼。使用 abstract class 直接管理複雜的 thread 或 process，使您能夠專注於撰寫程式碼，而無需深入了解底層細節。用法上比使用 multiprocessing、threading、asyncio module 容易得多，是相當推薦的 built-in library。它提供了兩個主要的 Class：`ThreadPoolExecutor` 針對 Thread(執行緒)，`ProcessPoolExecutor` 針對 Process(程序)。

<figure><img src="/files/cu8DxYwoQdB13qtbkxZd" alt="concurrent.futures module" width="563"><figcaption><p>concurrent.futures module</p></figcaption></figure>

使用 `ThreadPoolExecutor` 可以認為是 Concurrency，而 `ProcessPoolExecutor` 則可以當成 Parallelism。

<figure><img src="/files/tl3Chp9YuTRlYq7qgqlB" alt="Concurrency"><figcaption><p>Concurrency</p></figcaption></figure>

<figure><img src="/files/MP5yxH60GBS14F6utpzE" alt="Parallelism"><figcaption><p>Parallelism</p></figcaption></figure>

<div><figure><img src="/files/V6Rd6zs6b6vT8kwqLl1u" alt="Concurrency"><figcaption><p>Concurrency</p></figcaption></figure> <figure><img src="/files/HxiCfi2c16Gowb7ZyYeO" alt="Parallelism"><figcaption><p>Parallelism</p></figcaption></figure></div>

`ThreadPoolExecutor` 會在同一個 Process 裡面做事。當遇到被阻塞時，會 hang-up 並且由另外一個 Thread 繼續做事。

`ProcessPoolExecutor` 會建立新的 Process 來處理，當遇到被阻塞時，不會hang-up 並且持續不斷的計算。對於這種狀況，使用 Process 就如同開影分身，能夠讓整體的計算更快完成。

{% hint style="success" %}
**Thread vs Process**&#x20;

**Thread** ：程式執行任務的基本單位。&#x20;

**Process**：啟動應用程式時產生的執行實體，需要一定的 CPU 與記憶體資源，Process 由一到多個 Thread 組成，同一個 Process 裡的 Thread 可以共用記憶體資源。
{% endhint %}

{% hint style="info" %}
建立 thread 或是 process 都會花費時間成本，這些成本如果沒有辦法平均攤銷到計算裡面的話，就會變成阻力強大 overhead。
{% endhint %}

{% hint style="info" %}
相較於 `multiprocessing`，`concurrent.futures` 提供了一組更高階的和更簡單的用法 (例如：`submit()`， `map()`)。如果不關心非同步執行的細項時 (例如：`barrier`，`lock` )，使用 `concurrent.futures` 即可。
{% endhint %}

## Executor

***

提供非同步執行調用方法的 abstract class。

### submit()

用於在單獨的 thread 中非同步執行可調用的物件(函數或方法)，並返回表示計算結果的 Future 物件。

{% code title="PYTHON" %}

```python
submit(fn, *args, **kwargs)
```

{% endcode %}

`fn`：要非同步執行的可調用的物件。 `*args`：傳遞給可調用物件的 positional arguments。 `**kwargs`：傳遞給可調用物件的 keyword arguments。

#### 範例 – 基礎 1

{% code title="PYTHON" %}

```python
from concurrent import futures

def process_data(data):
    for x in range(data):
        print(x)
    return data ** 2

# Create worker threads
with futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Submit tasks
    future1 = executor.submit(process_data, 10)
    future2 = executor.submit(process_data, 11)

    # Wait results
    result1 = future1.result()
    result2 = future2.result()
    
    print(result1)
    print(result2)
```

{% endcode %}

**執行結果：**

```TXT
0
1
2
0
1
2
3
4
5
6
7
8
9
10
11
3
4
5
6
7
8
9
10
121
144
```

#### 範例 – 基礎 2

{% code title="PYTHON" %}

```python
import concurrent.futures
import time

def task(index):
    print(f"Task {index} executed.")
    time.sleep(1)

# Create worker threads
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)

time_1 = time.time()

# Submit tasks
futures = []
for i in range(5):
    future = executor.submit(task, i)
    futures.append(future)

executor.shutdown()
print("All tasks have completed.")

time_2 = time.time()
print(f'{time_2 - time_1:.4f} s')
```

{% endcode %}

**執行結果：**

```TXT
Task 0 executed.
Task 1 executed.
Task 2 executed.
Task 3 executed.
Task 4 executed.
All tasks have completed.
1.0043 s
```

#### 範例 – 串接 task

{% code title="PYTHON" %}

```python
import concurrent.futures

def task_a(data):
    result_a = data * 2
    return result_a

def task_b(data):
    result_b = data * 2
    return result_b

def task_c(data):
    result_c = data * 2
    return result_c

# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks
    task_a = executor.submit(task_a, 2)
    task_b = executor.submit(task_b, task_a.result())
    task_c = executor.submit(task_c, task_b.result())

    print(task_a.result())
    print(task_b.result())
    print(task_c.result())
```

{% endcode %}

**執行結果：**

```TXT
4
8
16
```

### map()

***

返回一個 iterator，我們可以在任務完成後，立即處理任務結果。而不是等待所有任務完成，一次性返回。 這在處理大量任務或任務完成順序很重要時，特別有用。

{% code title="PYTHON" %}

```python
map(func, 
    *iterables, 
    timeout=None, 
    chunksize=1)
```

{% endcode %}

<table><thead><tr><th width="151">參數</th><th>說明</th></tr></thead><tbody><tr><td><strong>func</strong></td><td>調用的函數。如果 <code>func</code> 引發 exception 且從 iterator 檢索其值時，將引發該 exception。</td></tr><tr><td><strong>*iterables</strong></td><td>將 <code>func</code> 應用於 <code>iterables</code> 的每個項目。</td></tr><tr><td><strong>timeout</strong></td><td>等待結果的最大秒數。如果 <code>timeout=None</code>，將無限期阻塞，直到結果可用。</td></tr><tr><td><strong>chunksize</strong></td><td>當 <code>map()</code> 與 <code>ProcessPoolExecutor</code> 一起使用時，將輸入的 iterables 物件劃分為較小的 chunks(區塊)。<br>每個 chunk 作為獨立的任務提交，由多個 work process 同時處理。 此參數確定每個 chunk 中，將有多少 iterable 物件在一起。當您有很長的 iterables 物件時，使用較大的 <code>chunksize</code> 可以增強 <code>map()</code> 的性能。(減少Submit tasks 以及主要 process 和 work process 之間通訊的開銷)</td></tr></tbody></table>

{% hint style="info" %}
**`as_completed()` 和 `map()` 之間的主要區別在於 tasks 結果的順序。**&#x20;

`as_completed()`：按 Submit tasks 完成的順序。 \
`map()`：按 Submit tasks 順序。
{% endhint %}

#### 範例 – 基礎

{% code title="PYTHON" %}

```python
import time
import concurrent.futures

def some_task(n):
    time.sleep(1)
    return n * 2

# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks
    futures = [executor.submit(some_task, i) for i in range(10)]
    
    # Use as_completed()
    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        print(f"Task result: {result}")
        
    # Use map()
    inputs = [i for i in range(10)]
    results = executor.map(some_task, inputs)
    for result in results:
        print(f"Task result: {result}")
```

{% endcode %}

**執行結果：**

```TXT
Task result: 2
Task result: 4
Task result: 0
Task result: 8
Task result: 10
Task result: 6
Task result: 14
Task result: 16
Task result: 12
Task result: 18

Task result: 0
Task result: 2
Task result: 4
Task result: 6
Task result: 8
Task result: 10
Task result: 12
Task result: 14
Task result: 16
Task result: 18
```

#### 範例 – Exception

{% code title="PYTHON" %}

```python
from concurrent.futures import ThreadPoolExecutor

# 不會立即引發 Exception。
def divide(x, y):
    if y == 0:
        raise ZeroDivisionError("Cannot divide by zero")
    return x / y

# Create worker threads
with ThreadPoolExecutor() as executor:
    # Submit tasks
    results = executor.map(divide, [40, 30, 20, 10], [2, 2, 0, 2])

    # Retrieve results
    for result in results:
        try:
            print(result)
        except Exception as e:
            print(f"Exception occurred: {e}")

```

{% endcode %}

**執行結果：**

```TXT
20.0
15.0
[line 15] ZeroDivisionError: Cannot divide by zero
```

#### 範例 – chunksize(參數)

{% code title="PYTHON" %}

```python
import time
import concurrent.futures

def measure_execution_time(func):
	def wrapper(*args, **kwargs):
		start_time = time.time()
		result = func(*args, **kwargs) # function callable
		end_time = time.time()
		print(f"{func.__name__} executed in {end_time - start_time:.3f} s.")
		return result
	return wrapper

def square(x):
	return x * 2

@measure_execution_time
def work_process(nums, chunksize):
	with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
		results = executor.map(square, nums, chunksize=chunksize)

if __name__ == '__main__':
	nums = range(1, 1000)  # 1000 tasks

	work_process(nums, 1)  # 0.374 s
	work_process(nums, 2)  # 0.227 s
	work_process(nums, 4)  # 0.172 s
	work_process(nums, 7)  # 0.150 s
	work_process(nums, 10) # 0.140 s
	work_process(nums, 15) # 0.134 s
	work_process(nums, 20) # 0.144 s
	work_process(nums, 30) # 0.137 s

```

{% endcode %}

### shutdown()

***

指示不再提交任何任務並且釋放資源。

{% code title="PYTHON" %}

```python
shutdown(wait=True, *, cancel_futures=False)
```

{% endcode %}

`wait`：是否應該阻塞，直到 pending futures 完成。 如果 `wait=True` ，將等待所有提交的任務完成後再返回。 如果 `wait=False`，將立即返回和釋放資源，而不等待任務完成。 `cancel_futures`：這個參數指定是否取消還沒有開始的 futures。如果`cancel_futures=False`，pending futures 將被允許在關閉之前完成。

{% hint style="info" %}
使用 with 語句，將會關閉 Executor。等效 `shutdown(wait=True, cancel_futures=False)`
{% endhint %}

{% code title="PYTHON" %}

```python
import time
import concurrent.futures

def task(i):
    print(i, 'Finish')
    
# Create worker threads
executor = concurrent.futures.ThreadPoolExecutor()
future1 = executor.submit(task, 1)
future2 = executor.submit(task, 2)

executor.shutdown()
```

{% endcode %}

{% code title="PYTHON" %}

```python
import time
import concurrent.futures

def task(i):
    print(i, 'Finish')
    
# Create worker threads
with concurrent.futures.ThreadPoolExecutor() as executor:
    future1 = executor.submit(task, 1)
    future2 = executor.submit(task, 2)
```

{% endcode %}

## ThreadPoolExecutor

***

一個 Executor subclass，實現基於 Thread 的 Executor。 通常用於 I/O 密集型任務。

{% code title="PYTHON" %}

```python
ThreadPoolExecutor(max_workers=None, 
                   thread_name_prefix='', 
                   initializer=None, 
                   initargs=())
```

{% endcode %}

<table><thead><tr><th width="209">參數</th><th>說明</th></tr></thead><tbody><tr><td><strong>max_workers</strong></td><td>調用多少個 thread。<br>如果 <code>max_workers=None</code> ，將預設為 <code>min(32, os.cpu_count()+4)</code>，並且保證至少<code>max_workers=5</code>。它最多使用32個 CPU 核心來執行釋放 GIL 的 CPU 綁定任務。</td></tr><tr><td><strong>thread_name_prefix</strong></td><td>控制 worker thread 的名稱。 這對於調試目的很有用，允許輕鬆識別 ThreadPool 中的 thread。</td></tr><tr><td><strong>initializer</strong></td><td>可調用物件，在每個 worker thread 開始時調用。如果 <code>initializer</code> 引發 exception，它將導致等待處理的作業引發 <code>BrokenThreadPool</code> Exception。</td></tr><tr><td><strong>initargs</strong></td><td>如果你提供了 <code>initializer</code>，你也可以提供一個 Tuple (<code>initargs</code>)。<code>initargs</code> 將被當作參數，傳遞給 <code>initializer</code> 函數。</td></tr></tbody></table>

{% hint style="info" %}
**`ThreadPoolExecutor` 通常用於處理涉及 I/O 操作的任務(例如：讀取或寫入文件、進行網絡請求、 interacting 數據庫)。**&#x20;

當任務花費大部分時間在等待 I/O 操作完成，而不是使用 CPU 資源時，`ThreadPoolExecutor` 特別有用。當一個 Thread 執行的 I/O 操作阻塞時，會釋放GIL，讓另一個Python線程執行。這表明 `ThreadPoolExecutor` 應該限於那些釋放 GIL 的任務。
{% endhint %}

{% hint style="info" %}
**`ThreadPoolExecutor(worker)` 應該要大於 `ProcessPoolExecutor(worker)`。**&#x20;

原因是 I/O 綁定任務可以從擁有更多 concurrent threads 中獲得幫助，因為 Thread 的大部分時間都花在等待 I/O 操作完成上。擁有更多的 worker thread有助於在重疊 I/O 操作時，最大限度地提高並發性和效率。
{% endhint %}

{% hint style="info" %}
**建議不應將 `ThreadPoolExecutor` 用於長時間運行的任務。**&#x20;

如果主要的 thread 遇到未處理的異常或意外終止，而沒有給 thread 完成的機會，則可能導致不可預測的行為和潛在的資源洩漏。 對於長時間運行的任務，使用 `ProcessPoolExecutor` 或 managing threads explicitly 等替代方法可能更好地控制任務的執行和終止。
{% endhint %}

如果在提交新任務時，Pool 中有可用的空閒 worker thread，它不會立即創建新的工作線程，而是首先重用空閒 worker thread 來執行新任務。即在創建 thread 之前，重用空閒 worker thread，來達到 `max_workers` 上限。

所有排隊的 worker thread 都將自動在直譯器退出之前加入，避免突然終止 thread 並可能留下未完成的工作。`ThreadPoolExecutor` 的內部退出處理程序優先於調用任何已註冊的退出處理程序(例如：atexit module)。

Python 中 GIL 的存在會影響 `ThreadPoolExecutor`。`ThreadPoolExecutor` 維護一個固定大小的 worker thread pool，支援並 concurrent 任務。但由於 CPython 中的 GIL 阻止多個 thread 同時執行 Python bytecode，意味著任務不會 parallel。

{% hint style="success" %}
Concurrency：表示任務之間的順序獨立性，它們可以隨時或同時完成。\
Parallel：是 concurrency 的一個子集，表示任務是同時執行的。
{% endhint %}

<figure><img src="/files/BfngEyKFR7FkLygougZX" alt="Parallel vs Concurrency"><figcaption><p>Parallel vs Concurrency</p></figcaption></figure>

### 範例 – thread\_name\_prefix(參數)

{% code title="PYTHON" %}

```python
import threading
import concurrent.futures

def task(i):
    thread_name = threading.current_thread().name
    print(i,"Executing task in thread:", thread_name)

# Create worker threads
with concurrent.futures.ThreadPoolExecutor(
    thread_name_prefix="WorkerThread", max_workers=5) as executor:
    # Submit tasks
    futures = [executor.submit(task, i) for i in range(10)]
```

{% endcode %}

**執行結果：**

```TXT
0 Executing task in thread: WorkerThread_0
1 Executing task in thread: WorkerThread_0
2 Executing task in thread: WorkerThread_1
3 Executing task in thread: WorkerThread_0
4 Executing task in thread: WorkerThread_2
6 Executing task in thread: WorkerThread_3
7 Executing task in thread: WorkerThread_0
5 Executing task in thread: WorkerThread_1
8 Executing task in thread: WorkerThread_4
9 Executing task in thread: WorkerThread_2
```

### 範例 –  worker thread

{% code title="PYTHON" %}

```python
import threading
import concurrent.futures

def task(i):
    thread_name = threading.current_thread().name
    print(i,"Executing task in thread:", thread_name)
    i = 0
    for _ in range(1000):
        i = i + 1
    print('Finish')
    
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(
    thread_name_prefix="WorkerThread", max_workers=3) as executor:
    # Submit tasks
    futures = [executor.submit(task, i) for i in range(6)]
```

{% endcode %}

**執行結果：**

```
0 Executing task in thread: WorkerThread_0
Finish
1 Executing task in thread: WorkerThread_1
2 Executing task in thread: WorkerThread_0
Finish
Finish
4 Executing task in thread: WorkerThread_1
5 Executing task in thread: WorkerThread_0
Finish
Finish
3 Executing task in thread: WorkerThread_2
Finish
```

## ProcessPoolExecutor

***

一個 Executor subclass，實現基於 process 的 Executor，通常用於 CPU 密集型任務。

{% code title="PYTHON" %}

```python
ProcessPoolExecutor(max_workers=None, 
                    mp_context=None, 
                    initializer=None, 
                    initargs=(), 
                    max_tasks_per_child=None)
```

{% endcode %}

<table><thead><tr><th width="217">參數</th><th>說明</th></tr></thead><tbody><tr><td><strong>max_workers</strong></td><td>調用多少個 process。<br>預設為設備上的邏輯處理器(processors)數量。 在 Windows 上，必須 <code>max_workers &#x3C;= 61</code>。</td></tr><tr><td><strong>mp_context</strong></td><td>使用 multiprocessing context。是一項高級功能，對於大多數用法而言並不是必需的。</td></tr><tr><td><strong>initializer</strong></td><td>可調用物件，在每個 worker process 開始時調用。如果 <code>initializer</code> 引發 exception，它將導致等待處理的作業引發 <code>BrokenProcessPool</code> Exception。</td></tr><tr><td><strong>initargs</strong></td><td>如果你提供了 <code>initializer</code>，你也可以提供一個 Tuple (<code>initargs</code>)。<code>initargs</code> 將被當作參數，傳遞給 <code>initializer</code> 函數。</td></tr><tr><td><strong>max_tasks_per_child</strong></td><td>指定 worker process 在終止和替換之前可以處理的最大任務數量，也就是控制 lifetime(生命週期)。<br>在沒有 <code>mp_context</code> 參數的情況下，當指定 <code>max_tasks_per_child</code> 時，將使用 <code>spawn</code> 啟動方法。 <code>spawn</code> 方法創建一個新的 Python 直譯器 process ，並啟動其中的 worker process。<code>spawn</code> 方法與 <code>fork</code> 方法不兼容。</td></tr></tbody></table>

`ProcessPoolExecutor` Class 使用 `multiprocessing` module，使其能夠繞過 \[\[GIL 【全域直譯器鎖】]]。 但是，必須先對數據進行序列化（pickled）。 因此， `ProcessPoolExecutor` 只能執行和返回 picklable 物件。

`__main__` 必須由 worker subprocesses 導入。這意味著 `ProcessPoolExecutor` 將無法在 interactive 直譯器中工作。此要求確保必要的代碼(函數定義和全域變數)可用於 worker process。也就是必須使用 `if __name__ == '__main__':` 。

將可調用物件(函數或方法)提交給 `ProcessPoolExecutor` 執行時，避免從可調用物件中，再次調用任何 `Executor` 或 `Future` 方法。 這樣做可能會導致執行停止並且無法取得進展的死鎖情況。

### 功能比較

***

<table data-header-hidden><thead><tr><th width="226.33333333333331"></th><th></th><th></th></tr></thead><tbody><tr><td>功能</td><td><code>ThreadPoolExecutor</code></td><td><code>ProcessPoolExecutor</code></td></tr><tr><td><strong>Execution Context</strong></td><td>共享 memory</td><td>memory 相互獨立</td></tr><tr><td><strong>Concurrency Model</strong></td><td>Concurrency</td><td>Parallelism</td></tr><tr><td><strong>Resource Utilization</strong></td><td>CPU 密集型任務</td><td>I/O 密集型任務</td></tr><tr><td><strong>Isolation and Safety</strong></td><td>如果某個任務遇到異常或崩潰，不會影響其他任務或主要 process。</td><td>會影響同一 process 中的其他任務，從而導致更複雜的錯誤處理和調試。</td></tr><tr><td><strong>Overhead and Startup Time</strong></td><td>高效率</td><td>需要更多的 overhead 和啟動時間。啟動新的 process 需要複製整個 process memory。</td></tr><tr><td><strong>Shared State and Synchronization</strong></td><td>可以直接訪問和修改 process memory 中的共享數據，但必須謹慎對共享資源的訪問，以避免競爭條件。</td><td>共享數據需要明確的機制。</td></tr></tbody></table>

### 範例 – 基礎

***

{% code title="PYTHON" %}

```python
import multiprocessing
import concurrent.futures

def task(i):
    return i * 2

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(task, i) for i in range(6)]
        for future in futures:
            print(future.result())
```

{% endcode %}

{% code title="PYTHON" %}

```python
import concurrent.futures

def task(i):
    return i * 2

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(task, i) for i in range(6)]
        for future in futures:
            print(future.result())

if __name__ == '__main__':
    main()
```

{% endcode %}

### 範例 – Process vs Thread

***

{% code title="PYTHON" %}

```python
import concurrent.futures
import time

# Fibonacci sequence (費波那契數、黃金分割數)
def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)

def thread_function():
    time_1 = time.time()
    # Create a ThreadPoolExecutor
    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
        # Submit tasks
        futures = [executor.submit(fib, i) for i in range(10, 37)]

        # Retrieve results
        results = [future.result() for future in futures]
    time_2 = time.time()
    print(f'{time_2 - time_1:.2} s')
    print(results)

def process_function():
    time_1 = time.time()
    # Create a ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
        # Submit tasks
        futures = [executor.submit(fib, i) for i in range(10, 37)]

        # Retrieve results
        results = [future.result() for future in futures]
    time_2 = time.time()
    print(f'{time_2 - time_1:.2} s')
    print(results)


if __name__ == '__main__':
    thread_function()
    process_function()
```

{% endcode %}

**執行結果：**

```TXT
5.3 s
[55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352]
2.3 s
[55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352]
```

\ <br>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.xiwind-corp.com/tech/python-library/concurrent.futures-zhi-hang-xu-cheng-xu.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
