import concurrent.futures
def square(x):
return x ** 2
# processing on the result
def process_result(future):
result = future.result()
print(result)
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
task1 = executor.submit(square, 5)
task1.add_done_callback(process_result)
task2 = executor.submit(square, 7)
task2.add_done_callback(process_result)
running()
檢查任務是否正在執行且無法取消。
PYTHON
import time
import concurrent.futures
def task():
time.sleep(1)
print("Task executed.")
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(task)
if future.running(): # Check task
print("Task is currently running.") # ✔️✔️
else:
print("Task is not running.")
PYTHON
import concurrent.futures
import time
def task(index):
print(f"Task {index} executed.")
time.sleep(1.5)
executor = concurrent.futures.ThreadPoolExecutor()
# Submit tasks
futures = []
for i in range(5):
future = executor.submit(task, i)
futures.append(future)
# Monitor running status
while any(f.running() for f in futures):
print("Tasks are still running...")
time.sleep(0.3)
print("All tasks have completed.")
executor.shutdown()
執行結果:
Task 0 executed.
Task 1 executed.
Task 2 executed.
Task 3 executed.
Task 4 executed.
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
Tasks are still running...
All tasks have completed.
cancel()
嘗試取消任務。如果調用當前正在執行的任務或已完成的任務,則該方法將返回 False。
PYTHON
import time
import concurrent.futures
def task():
time.sleep(1)
print("Task executed.")
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(task)
if future.cancel():
print("Task cancelled.")
else:
print("Task could not be cancelled.") # ✔️✔️
cancelled()
檢查任務是否已被取消。
PYTHON
import time
import concurrent.futures
def task():
time.sleep(1)
print("Task executed.")
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(task)
future.cancel()
if future.cancelled(): # Check task
print("Task cancelled.")
else:
print("Task could not be cancelled.") # ✔️✔️
exception()
返回調用任務所引發的 exception。
PYTHON
from concurrent import futures
def task():
raise ValueError("Something went wrong!")
# Create worker threads
with futures.ThreadPoolExecutor(max_workers=5) as executor:
# Submit tasks
future = executor.submit(task)
result = future.exception()
print(result) # Output: Something went wrong!
import time
import concurrent.futures
def some_task(n):
time.sleep(1)
return n * 2
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks
futures = [executor.submit(some_task, i) for i in range(5)]
# Use as_completed()
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(f"Task result: {result}")
# No use as_completed()
for future in futures:
result = future.result()
print(f"Task result: {result}")
import concurrent.futures
import time
def process_data(data):
result = data
time.sleep(data/3)
return result
# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit tasks
futures = [executor.submit(process_data, i) for i in range(1, 6)]
# Wait futures
done, not_done = concurrent.futures.wait(futures, return_when = 'FIRST_COMPLETED')
# Retrieve results
results = [future.result() for future in done]
print(results)
執行結果:
[1]
範例 – FIRST_EXCEPTION
PYTHON
import concurrent.futures
import time
def process_data(data):
if data == 3:
raise ValueError("Invalid input: 3")
time.sleep(0.3)
return data
# Create ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit tasks
futures = [executor.submit(process_data, i) for i in range(1, 7)]
# Wait futures
done, not_done = concurrent.futures.wait(futures,
return_when = "FIRST_EXCEPTION")
# Retrieve results
results = []
for future in done:
try:
result = future.result()
except Exception as e:
result = f"Error: {type(e).__name__} - {str(e)}"
results.append(result)
print(results)