
time(), time_ns()
time()
以 float 形式返回自紀元以來的時間(以秒為單位)。 閏秒的處理取決於平台。 在 Windows 和大多數 Unix 系統上,閏秒不計入自紀元以來的秒數中。 這通常稱為 Unix 時間。
time_ns()
與 time()
類似,但以奈秒 (nanoseconds) 為單位。可以避免 float 帶來的精準度損失。
import time
current_time = time.time()
print('The current time is', current_time) # Output: 1688807663.873429
start_time = time.time()
time.sleep(1)
end_time = time.time()
print(f"{end_time - start_time:.4f} s") # Output: 1.0123 s
monotonic(), monotonic_ns()
monotonic()
返回單調遞增時鐘 (Monotonic Clock),以秒為單位。 時鐘不受系統時鐘更新的影響。 返回值的參考點未定義,因此只有兩次調用結果之間的差異才有效。
這使得它在您想要測量經過時間而不必擔心由於閏秒、DST (Daylight Saving Time)或系統時鐘的其他更改,而導致的時間變化的情況下非常有用。
monotonic_ns()
與 monotonic()
類似,但以奈秒 (nanoseconds) 為單位。可以避免 float 帶來的精準度損失。
import time
start = time.monotonic()
# Simulating a long running process
time.sleep(5)
end = time.monotonic()
elapsed_time = end - start
print(f'The process took {elapsed_time} seconds to complete.')
範例 – 速率限制器
速率限制器 (RateLimiter) 是一種工具,用於監視實體 (例如:使用者或 IP address) 在一定時間內 (例如:一分鐘或一秒) 向 server 或 application 發出的 request 數量。 如果 request 超過限制,則速率限制器會阻止所有超出的調用。通常用於防止 server 同時被太多 request 淹沒,來維持 server 健康。例如:防止濫用、減少垃圾郵件。
想像一下您有一個程序,其中某些部分每秒執行次數不得超過一次。 您可以實現一個簡單的速率限制器,來強制執行此限制。
SimpleRateLimiter
Class 有一個 is_allowed
方法,用於檢查當前時間 (monotonic()
) 是否大於或等於 self.allow_request
。
如果是,則意味著自上次允許執行以來已經過去了 1 秒 (或更多)。因此將 self.allow_request
設置為當前時間加 1 秒,並返回 True
。 如果不是,則表示已經過去了不到 1 秒,所以返回 False
。
在循環中,我們嘗試每 0.5 秒執行一些程式碼。 但我們設置的速率限制器只允許它每 1 秒執行一次。 因此,一半的執行嘗試將被阻止。
import time
class SimpleRateLimiter:
def __init__(self):
self.allow_request = time.monotonic()
def is_allowed(self):
current_time = time.monotonic()
if self.allow_request <= current_time:
self.allow_request = current_time + 1.0
return True
return False
def my_func():
#print("Called!")
...
rate_limiter = SimpleRateLimiter()
# Simulate 20 requests
for i in range(20):
time.sleep(0.5) # Simulate processing tasks
if rate_limiter.is_allowed():
print(f"Execution user is allowed.")
my_func()
else:
print(f"Execution user is blocked.")
執行結果:
Execution user is allowed.
Execution user is blocked.
Execution user is allowed.
Execution user is blocked.
Execution user is allowed.
Execution user is blocked.
...
速率限制器 – 正式版
import time
class RateLimiter:
def __init__(self, max_calls_per_second):
self.max_calls_per_second = max_calls_per_second
self.last_call = None
def call(self, func, *args, **kwargs):
now = time.monotonic()
if self.last_call is not None and (now - self.last_call < 1 / self.max_calls_per_second):
return "Rate limit exceeded"
else:
self.last_call = now
return func(*args, **kwargs)
rate_limiter = RateLimiter(1) # Allow 1 call per second
def my_func():
return "Called!"
# Call the function within the rate limit
print(rate_limiter.call(my_func)) # Outputs: "Called!"
print(rate_limiter.call(my_func)) # Outputs: "Rate limit exceeded"
速率限制器 – Pythonic
在此示例中,rate_limited 是一個 decorator factory。 它將每秒最大調用次數作為輸入,並返回一個 decorator。
這種方法可以說更Pythonic,因為它將速率限制邏輯封裝到單個 decorator 中,然後只需在函數定義之前添加 @rate_limited(max_calls_per_second)
即可輕鬆應用於任何函數。
import time
def rate_limited(max_calls_per_second):
last_called = 0.0
def decorator(func):
def wrapper(*args, **kwargs):
nonlocal last_called
now = time.monotonic()
if now - last_called < 1 / max_calls_per_second: # Set certain rate
return "Rate limit exceeded"
else:
last_called = now
return func(*args, **kwargs)
return wrapper
return decorator
@rate_limited(1) # Allow 1 call per second
def my_func():
return "Called!"
# Call the function within the rate limit
print(my_func()) # Outputs: "Called!"
time.sleep(1)
print(my_func()) # Outputs: "Called!"
print(my_func()) # Outputs: "Rate limit exceeded"
import time
class RateLimiter:
def __init__(self, max_calls_per_second):
self.max_calls_per_second = max_calls_per_second
self.last_call = 0
self.__count = 0
def __call__(self, func):
def wrapper(*args, **kwargs):
now = time.monotonic()
if now - self.last_call < 1 / self.max_calls_per_second:
return "Rate limit exceeded"
else:
self.__count += 1
self.last_call = now
return func(*args, **kwargs)
return wrapper
@property
def counter(self):
return self.__count
limiter = RateLimiter(1) # Allow 1 call per second
@limiter
def my_func():
return "Called!"
# Call the function within the rate limit
print(my_func()) # Outputs: "Called!"
time.sleep(1)
print(my_func()) # Outputs: "Called!"
print(my_func()) # Outputs: Rate limit exceeded
print(limiter.counter) # Outputs: 2
範例 – Token 存儲桶速率限制器
該程式碼實現了一種稱為 token bucket (桶) 的速率限制策略。重新填充的 Tokens 數量永遠不會超過 max_rate
。 Request 消耗存儲桶中的 Token。 如果桶中沒有剩餘 Tokens,則 request 將被丟棄或排隊等待稍後處理。
如果 Token 被消耗,則返回 True
。 如果自上次消耗 Token 以來,沒有重新填充足夠的 Token,則返回 False
。 在這種情況下,程序會休眠一小段時間,然後再重試。
from math import ceil
import time
class RateLimiter:
def __init__(self, max_rate):
self.max_rate = max_rate
self.tokens = max_rate
self.last_refill = time.monotonic()
def consume(self):
now = time.monotonic()
elapsed = now - self.last_refill
# Refills the token bucket
self.tokens += elapsed * self.max_rate
# ensures that the number of tokens in the bucket never exceeds "max_rate"
self.tokens = min(self.tokens, self.max_rate)
if self.tokens < 1.0:
return False
self.tokens -= 1.0
self.last_refill = now
return True
@property
def tokensc(self):
return self.tokens
@tokensc.setter
def tokensc(self, value):
self.tokens = value
self.max_rate = value
def my_func():
#print("Called!")
...
rate_limiter = RateLimiter(1.0)
rate_limiter.tokensc = 10.0
# Simulate 10 requests
for i in range(10):
while not rate_limiter.consume():
time.sleep(0.1)
print(f'Request {i + 1} processed. Tokens left: {int(rate_limiter.tokensc)}')
my_func()
rate_limiter.tokensc = 1.0
# Simulate 10 requests
for i in range(10):
while not rate_limiter.consume():
time.sleep(0.1)
print(f'Request {i + 1} processed. Tokens left: {int(rate_limiter.tokensc)}')
my_func()
執行結果:
Request 1 processed. Tokens left: 9
Request 2 processed. Tokens left: 8
Request 3 processed. Tokens left: 7
Request 4 processed. Tokens left: 6
Request 5 processed. Tokens left: 5
Request 6 processed. Tokens left: 4
Request 7 processed. Tokens left: 3
Request 8 processed. Tokens left: 2
Request 9 processed. Tokens left: 1
Request 10 processed. Tokens left: 0
Request 1 processed. Tokens left: 0
Request 2 processed. Tokens left: 0
Request 3 processed. Tokens left: 0
...
perf_counter(), perf_counter_ns()
perf_counter()
具有最高可用解析度的時鐘 (最精確的可用計時器),常用於測量短的持續時間。 返回 Performance Counter (效能計數器) 的值 (以秒為單位)。返回值的參考點是未定義的,因此只有兩次調用結果之間的差異才有效。
perf_counter_ns()
與 perf_counter()
類似,但以奈秒 (nanoseconds) 為單位。可以避免 float 帶來的精準度損失。
process_time(), process_time_ns()
process_time()
返回當前 process 的系統和使用者 CPU 時間的總和。 它不包括睡眠期間經過的時間 (sleep()
)。 返回值的參考點是未定義的,因此只有兩次調用結果之間的差異才有效。
process_time_ns()
與 process_time()
類似,但以奈秒 (nanoseconds) 為單位。可以避免 float 帶來的精準度損失。
import time
# Start the timers
start_process_time = time.process_time()
start_perf_counter = time.perf_counter()
# Run a loop for a bit
for _ in range(1000000):
pass
# Sleep for a bit
time.sleep(2)
# Stop the timers
end_process_time = time.process_time()
end_perf_counter = time.perf_counter()
print('process_time:', end_process_time - start_process_time)
# Output: 0.0156 s
print('perf_counter:', end_perf_counter - start_perf_counter)
# Output: 2.0361 s
thread_time(), thread_time_ns()
返回當前 thread 的系統和使用者 CPU 時間之和的值 (以秒為單位)。 它不包括睡眠期間經過的時間 (sleep()
)。返回值的參考點是未定義的,因此只有同一 thread 中兩次調用結果之間的差異才有效。
thread_time_ns()
與 thread_time()
類似,但以奈秒 (nanoseconds) 為單位。可以避免 float 帶來的精準度損失。
import time
import concurrent.futures
def cpu_intensive_task(thread_num):
start_time = time.thread_time()
for i in range(10**7): # A simple CPU intensive task
pass
end_time = time.thread_time()
return f"Thread time taken by cpu_intensive_task in Worker-{thread_num}: {end_time - start_time} sec"
# Create ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Use list comprehension to create futures
futures = [executor.submit(cpu_intensive_task, i) for i in range(5)]
for future in concurrent.futures.as_completed(futures):
print(future.result())
執行結果:
Thread time taken by cpu_intensive_task in Worker-0: 0.09375 sec
Thread time taken by cpu_intensive_task in Worker-2: 0.109375 sec
Thread time taken by cpu_intensive_task in Worker-1: 0.109375 sec
Thread time taken by cpu_intensive_task in Worker-3: 0.109375 sec
Thread time taken by cpu_intensive_task in Worker-4: 0.109375 sec
Last updated
Was this helpful?