import time
import concurrent.futures
def some_task(n):
time.sleep(1)
return n * 2
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks
futures = [executor.submit(some_task, i) for i in range(10)]
# Use as_completed()
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(f"Task result: {result}")
# Use map()
inputs = [i for i in range(10)]
results = executor.map(some_task, inputs)
for result in results:
print(f"Task result: {result}")
import threading
import concurrent.futures
def task(i):
thread_name = threading.current_thread().name
print(i,"Executing task in thread:", thread_name)
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(
thread_name_prefix="WorkerThread", max_workers=5) as executor:
# Submit tasks
futures = [executor.submit(task, i) for i in range(10)]
執行結果:
0 Executing task in thread: WorkerThread_0
1 Executing task in thread: WorkerThread_0
2 Executing task in thread: WorkerThread_1
3 Executing task in thread: WorkerThread_0
4 Executing task in thread: WorkerThread_2
6 Executing task in thread: WorkerThread_3
7 Executing task in thread: WorkerThread_0
5 Executing task in thread: WorkerThread_1
8 Executing task in thread: WorkerThread_4
9 Executing task in thread: WorkerThread_2
範例 – worker thread
PYTHON
import threading
import concurrent.futures
def task(i):
thread_name = threading.current_thread().name
print(i,"Executing task in thread:", thread_name)
i = 0
for _ in range(1000):
i = i + 1
print('Finish')
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(
thread_name_prefix="WorkerThread", max_workers=3) as executor:
# Submit tasks
futures = [executor.submit(task, i) for i in range(6)]
執行結果:
0 Executing task in thread: WorkerThread_0
Finish
1 Executing task in thread: WorkerThread_1
2 Executing task in thread: WorkerThread_0
Finish
Finish
4 Executing task in thread: WorkerThread_1
5 Executing task in thread: WorkerThread_0
Finish
Finish
3 Executing task in thread: WorkerThread_2
Finish
ProcessPoolExecutor
一個 Executor subclass,實現基於 process 的 Executor,通常用於 CPU 密集型任務。
需要更多的 overhead 和啟動時間。啟動新的 process 需要複製整個 process memory。
Shared State and Synchronization
可以直接訪問和修改 process memory 中的共享數據,但必須謹慎對共享資源的訪問,以避免競爭條件。
共享數據需要明確的機制。
範例 – 基礎
PYTHON
import multiprocessing
import concurrent.futures
def task(i):
return i * 2
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(task, i) for i in range(6)]
for future in futures:
print(future.result())
PYTHON
import concurrent.futures
def task(i):
return i * 2
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(task, i) for i in range(6)]
for future in futures:
print(future.result())
if __name__ == '__main__':
main()
範例 – Process vs Thread
PYTHON
import concurrent.futures
import time
# Fibonacci sequence (費波那契數、黃金分割數)
def fib(n):
if n <= 2:
return 1
return fib(n-1) + fib(n-2)
def thread_function():
time_1 = time.time()
# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
# Submit tasks
futures = [executor.submit(fib, i) for i in range(10, 37)]
# Retrieve results
results = [future.result() for future in futures]
time_2 = time.time()
print(f'{time_2 - time_1:.2} s')
print(results)
def process_function():
time_1 = time.time()
# Create a ThreadPoolExecutor
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
# Submit tasks
futures = [executor.submit(fib, i) for i in range(10, 37)]
# Retrieve results
results = [future.result() for future in futures]
time_2 = time.time()
print(f'{time_2 - time_1:.2} s')
print(results)
if __name__ == '__main__':
thread_function()
process_function()