import time
start = time.monotonic()
# Simulating a long running process
time.sleep(5)
end = time.monotonic()
elapsed_time = end - start
print(f'The process took {elapsed_time} seconds to complete.')
範例 – 速率限制器
速率限制器 (RateLimiter) 是一種工具,用於監視實體 (例如:使用者或 IP address) 在一定時間內 (例如:一分鐘或一秒) 向 server 或 application 發出的 request 數量。 如果 request 超過限制,則速率限制器會阻止所有超出的調用。通常用於防止 server 同時被太多 request 淹沒,來維持 server 健康。例如:防止濫用、減少垃圾郵件。
import time
class SimpleRateLimiter:
def __init__(self):
self.allow_request = time.monotonic()
def is_allowed(self):
current_time = time.monotonic()
if self.allow_request <= current_time:
self.allow_request = current_time + 1.0
return True
return False
def my_func():
#print("Called!")
...
rate_limiter = SimpleRateLimiter()
# Simulate 20 requests
for i in range(20):
time.sleep(0.5) # Simulate processing tasks
if rate_limiter.is_allowed():
print(f"Execution user is allowed.")
my_func()
else:
print(f"Execution user is blocked.")
執行結果:
Execution user is allowed.
Execution user is blocked.
Execution user is allowed.
Execution user is blocked.
Execution user is allowed.
Execution user is blocked.
...
速率限制器 – 正式版
PYTHON
import time
class RateLimiter:
def __init__(self, max_calls_per_second):
self.max_calls_per_second = max_calls_per_second
self.last_call = None
def call(self, func, *args, **kwargs):
now = time.monotonic()
if self.last_call is not None and (now - self.last_call < 1 / self.max_calls_per_second):
return "Rate limit exceeded"
else:
self.last_call = now
return func(*args, **kwargs)
rate_limiter = RateLimiter(1) # Allow 1 call per second
def my_func():
return "Called!"
# Call the function within the rate limit
print(rate_limiter.call(my_func)) # Outputs: "Called!"
print(rate_limiter.call(my_func)) # Outputs: "Rate limit exceeded"
import time
# Start the timers
start_process_time = time.process_time()
start_perf_counter = time.perf_counter()
# Run a loop for a bit
for _ in range(1000000):
pass
# Sleep for a bit
time.sleep(2)
# Stop the timers
end_process_time = time.process_time()
end_perf_counter = time.perf_counter()
print('process_time:', end_process_time - start_process_time)
# Output: 0.0156 s
print('perf_counter:', end_perf_counter - start_perf_counter)
# Output: 2.0361 s
thread_time(), thread_time_ns()
返回當前 thread 的系統和使用者 CPU 時間之和的值 (以秒為單位)。 它不包括睡眠期間經過的時間 (sleep())。返回值的參考點是未定義的,因此只有同一 thread 中兩次調用結果之間的差異才有效。
由於全域直譯器鎖 (GIL),在 Python thread 中,運行的 CPU 密集型任務不會 Parallel,而是會 Concurrency。詳見:concurrent.futures【執行緒、程序】。
PYTHON
import time
import concurrent.futures
def cpu_intensive_task(thread_num):
start_time = time.thread_time()
for i in range(10**7): # A simple CPU intensive task
pass
end_time = time.thread_time()
return f"Thread time taken by cpu_intensive_task in Worker-{thread_num}: {end_time - start_time} sec"
# Create ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Use list comprehension to create futures
futures = [executor.submit(cpu_intensive_task, i) for i in range(5)]
for future in concurrent.futures.as_completed(futures):
print(future.result())
執行結果:
Thread time taken by cpu_intensive_task in Worker-0: 0.09375 sec
Thread time taken by cpu_intensive_task in Worker-2: 0.109375 sec
Thread time taken by cpu_intensive_task in Worker-1: 0.109375 sec
Thread time taken by cpu_intensive_task in Worker-3: 0.109375 sec
Thread time taken by cpu_intensive_task in Worker-4: 0.109375 sec