Page cover

time(), time_ns()


time() 以 float 形式返回自紀元以來的時間(以秒為單位)。 閏秒的處理取決於平台。 在 Windows 和大多數 Unix 系統上,閏秒不計入自紀元以來的秒數中。 這通常稱為 Unix 時間。

time_ns()time() 類似,但以奈秒 (nanoseconds) 為單位。可以避免 float 帶來的精準度損失。

PYTHON
import time

current_time = time.time()
print('The current time is', current_time) # Output: 1688807663.873429

start_time = time.time()
time.sleep(1)
end_time = time.time()

print(f"{end_time - start_time:.4f} s") # Output: 1.0123 s

monotonic(), monotonic_ns()


monotonic() 返回單調遞增時鐘 (Monotonic Clock),以秒為單位。 時鐘不受系統時鐘更新的影響。 返回值的參考點未定義,因此只有兩次調用結果之間的差異才有效。

這使得它在您想要測量經過時間而不必擔心由於閏秒、DST (Daylight Saving Time)或系統時鐘的其他更改,而導致的時間變化的情況下非常有用。

monotonic_ns()monotonic() 類似,但以奈秒 (nanoseconds) 為單位。可以避免 float 帶來的精準度損失。

盡可能使用 monotonic() 來測量持續時間和超時,而不是 time() ,以避免調整系統時鐘時可能出現的細微錯誤。

PYTHON
import time

start = time.monotonic()
# Simulating a long running process
time.sleep(5)
end = time.monotonic()

elapsed_time = end - start
print(f'The process took {elapsed_time} seconds to complete.')

範例 – 速率限制器


速率限制器 (RateLimiter) 是一種工具,用於監視實體 (例如:使用者或 IP address) 在一定時間內 (例如:一分鐘或一秒) 向 server 或 application 發出的 request 數量。 如果 request 超過限制,則速率限制器會阻止所有超出的調用。通常用於防止 server 同時被太多 request 淹沒,來維持 server 健康。例如:防止濫用、減少垃圾郵件。

想像一下您有一個程序,其中某些部分每秒執行次數不得超過一次。 您可以實現一個簡單的速率限制器,來強制執行此限制。

SimpleRateLimiter Class 有一個 is_allowed 方法,用於檢查當前時間 (monotonic()) 是否大於或等於 self.allow_request

如果是,則意味著自上次允許執行以來已經過去了 1 秒 (或更多)。因此將 self.allow_request 設置為當前時間加 1 秒,並返回 True。 如果不是,則表示已經過去了不到 1 秒,所以返回 False

在循環中,我們嘗試每 0.5 秒執行一些程式碼。 但我們設置的速率限制器只允許它每 1 秒執行一次。 因此,一半的執行嘗試將被阻止。

這種簡單的速率限制器不允許流量突發。 無論之前的活動如何,它每秒只允許一個請求。

PYTHON
import time

class SimpleRateLimiter:
    def __init__(self):
        self.allow_request = time.monotonic()

    def is_allowed(self):
        current_time = time.monotonic()
        if self.allow_request <= current_time:
            self.allow_request = current_time + 1.0
            return True
        return False

def my_func():
    #print("Called!")
    ...

rate_limiter = SimpleRateLimiter()

# Simulate 20 requests
for i in range(20):
    time.sleep(0.5)  # Simulate processing tasks
    if rate_limiter.is_allowed():
        print(f"Execution user is allowed.")
        my_func()
    else:
        print(f"Execution user is blocked.")

執行結果:

Execution user is allowed.
Execution user is blocked.
Execution user is allowed.
Execution user is blocked.
Execution user is allowed.
Execution user is blocked.
...

速率限制器 – 正式版


PYTHON
import time

class RateLimiter:
    def __init__(self, max_calls_per_second):
        self.max_calls_per_second = max_calls_per_second
        self.last_call = None

    def call(self, func, *args, **kwargs):
        now = time.monotonic()
        if self.last_call is not None and (now - self.last_call < 1 / self.max_calls_per_second):
            return "Rate limit exceeded"
        else:
            self.last_call = now
            return func(*args, **kwargs)

rate_limiter = RateLimiter(1)  # Allow 1 call per second

def my_func():
    return "Called!"

# Call the function within the rate limit
print(rate_limiter.call(my_func))  # Outputs: "Called!"
print(rate_limiter.call(my_func))  # Outputs: "Rate limit exceeded"

速率限制器 – Pythonic


在此示例中,rate_limited 是一個 decorator factory。 它將每秒最大調用次數作為輸入,並返回一個 decorator。

這種方法可以說更Pythonic,因為它將速率限制邏輯封裝到單個 decorator 中,然後只需在函數定義之前添加 @rate_limited(max_calls_per_second) 即可輕鬆應用於任何函數。

Decorators Class 和 Decorators function 之間的選擇,通常可以歸結為個人偏好、用例特殊性或風格。

如果 Decorator 需要在調用被 decorated function 之間維護狀態或提供額外的交互方法,則 decorator Class 可能更合適。

PYTHON
import time

def rate_limited(max_calls_per_second):
    last_called = 0.0
    def decorator(func):
        def wrapper(*args, **kwargs):
            nonlocal last_called
            now = time.monotonic()
            if now - last_called < 1 / max_calls_per_second: # Set certain rate 
                return "Rate limit exceeded"
            else:
                last_called = now
                return func(*args, **kwargs)
        return wrapper
    return decorator

@rate_limited(1)  # Allow 1 call per second
def my_func():
    return "Called!"

# Call the function within the rate limit
print(my_func())  # Outputs: "Called!"
time.sleep(1)
print(my_func())  # Outputs: "Called!"
print(my_func())  # Outputs: "Rate limit exceeded"
PYTHON
import time

class RateLimiter:
	def __init__(self, max_calls_per_second):
		self.max_calls_per_second = max_calls_per_second
		self.last_call = 0
		self.__count = 0

	def __call__(self, func):
		def wrapper(*args, **kwargs):
			now = time.monotonic()
			if now - self.last_call < 1 / self.max_calls_per_second:
				return "Rate limit exceeded"
			else:
				self.__count += 1
				self.last_call = now
				return func(*args, **kwargs)
		return wrapper

	@property
	def counter(self):
		return self.__count

limiter = RateLimiter(1)  # Allow 1 call per second

@limiter
def my_func():
	return "Called!"

# Call the function within the rate limit
print(my_func())	   # Outputs: "Called!"
time.sleep(1)
print(my_func())	   # Outputs: "Called!"
print(my_func())	   # Outputs: Rate limit exceeded

print(limiter.counter) # Outputs: 2

範例 – Token 存儲桶速率限制器


該程式碼實現了一種稱為 token bucket (桶) 的速率限制策略。重新填充的 Tokens 數量永遠不會超過 max_rate。 Request 消耗存儲桶中的 Token。 如果桶中沒有剩餘 Tokens,則 request 將被丟棄或排隊等待稍後處理。

如果 Token 被消耗,則返回 True。 如果自上次消耗 Token 以來,沒有重新填充足夠的 Token,則返回 False。 在這種情況下,程序會休眠一小段時間,然後再重試。

該算法跟蹤經過的時間和 Tokens,使其能夠順利處理突發性 requests。

PYTHON
from math import ceil
import time

class RateLimiter:
	def __init__(self, max_rate):
		self.max_rate = max_rate
		self.tokens = max_rate
		self.last_refill = time.monotonic()

	def consume(self):
		now = time.monotonic()
		elapsed = now - self.last_refill
		# Refills the token bucket
		self.tokens += elapsed * self.max_rate  
		# ensures that the number of tokens in the bucket never exceeds "max_rate"
		self.tokens = min(self.tokens, self.max_rate)
		if self.tokens < 1.0:
			return False

		self.tokens -= 1.0
		self.last_refill = now
		return True

	@property
	def tokensc(self):
		return self.tokens

	@tokensc.setter
	def tokensc(self, value):
		self.tokens = value
		self.max_rate = value

def my_func():
	#print("Called!")
	...

rate_limiter = RateLimiter(1.0)
rate_limiter.tokensc = 10.0

# Simulate 10 requests
for i in range(10):
	while not rate_limiter.consume():
		time.sleep(0.1)
	print(f'Request {i + 1} processed. Tokens left: {int(rate_limiter.tokensc)}')
	my_func()

rate_limiter.tokensc = 1.0

# Simulate 10 requests
for i in range(10):
	while not rate_limiter.consume():
		time.sleep(0.1)
	print(f'Request {i + 1} processed. Tokens left: {int(rate_limiter.tokensc)}')
	my_func()

執行結果:

Request 1 processed. Tokens left: 9
Request 2 processed. Tokens left: 8
Request 3 processed. Tokens left: 7
Request 4 processed. Tokens left: 6
Request 5 processed. Tokens left: 5
Request 6 processed. Tokens left: 4
Request 7 processed. Tokens left: 3
Request 8 processed. Tokens left: 2
Request 9 processed. Tokens left: 1
Request 10 processed. Tokens left: 0
Request 1 processed. Tokens left: 0
Request 2 processed. Tokens left: 0
Request 3 processed. Tokens left: 0
...

perf_counter(), perf_counter_ns()


perf_counter() 具有最高可用解析度的時鐘 (最精確的可用計時器),常用於測量短的持續時間。 返回 Performance Counter (效能計數器) 的值 (以秒為單位)。返回值的參考點是未定義的,因此只有兩次調用結果之間的差異才有效。

perf_counter_ns()perf_counter() 類似,但以奈秒 (nanoseconds) 為單位。可以避免 float 帶來的精準度損失。

process_time(), process_time_ns()


process_time() 返回當前 process 的系統和使用者 CPU 時間的總和。 它不包括睡眠期間經過的時間 (sleep())。 返回值的參考點是未定義的,因此只有兩次調用結果之間的差異才有效。

process_time_ns()process_time() 類似,但以奈秒 (nanoseconds) 為單位。可以避免 float 帶來的精準度損失。

PYTHON
import time

# Start the timers
start_process_time = time.process_time()
start_perf_counter = time.perf_counter()

# Run a loop for a bit
for _ in range(1000000):
    pass

# Sleep for a bit
time.sleep(2)

# Stop the timers
end_process_time = time.process_time()
end_perf_counter = time.perf_counter()

print('process_time:', end_process_time - start_process_time)
# Output: 0.0156 s

print('perf_counter:', end_perf_counter - start_perf_counter)
# Output: 2.0361 s

thread_time(), thread_time_ns()


返回當前 thread 的系統和使用者 CPU 時間之和的值 (以秒為單位)。 它不包括睡眠期間經過的時間 (sleep())。返回值的參考點是未定義的,因此只有同一 thread 中兩次調用結果之間的差異才有效。

thread_time_ns()thread_time() 類似,但以奈秒 (nanoseconds) 為單位。可以避免 float 帶來的精準度損失。

由於全域直譯器鎖 (GIL),在 Python thread 中,運行的 CPU 密集型任務不會 Parallel,而是會 Concurrency。詳見:concurrent.futures【執行緒、程序】

PYTHON
import time
import concurrent.futures

def cpu_intensive_task(thread_num):
    start_time = time.thread_time()
    for i in range(10**7): # A simple CPU intensive task
        pass
    end_time = time.thread_time()
    return f"Thread time taken by cpu_intensive_task in Worker-{thread_num}: {end_time - start_time} sec"

# Create ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Use list comprehension to create futures
    futures = [executor.submit(cpu_intensive_task, i) for i in range(5)]
    for future in concurrent.futures.as_completed(futures):
        print(future.result())

執行結果:

Thread time taken by cpu_intensive_task in Worker-0: 0.09375 sec
Thread time taken by cpu_intensive_task in Worker-2: 0.109375 sec
Thread time taken by cpu_intensive_task in Worker-1: 0.109375 sec
Thread time taken by cpu_intensive_task in Worker-3: 0.109375 sec
Thread time taken by cpu_intensive_task in Worker-4: 0.109375 sec

Last updated

Was this helpful?