import timestart = time.monotonic()# Simulating a long running processtime.sleep(5)end = time.monotonic()elapsed_time = end - startprint(f'The process took {elapsed_time} seconds to complete.')
範例 – 速率限制器
速率限制器 (RateLimiter) 是一種工具,用於監視實體 (例如:使用者或 IP address) 在一定時間內 (例如:一分鐘或一秒) 向 server 或 application 發出的 request 數量。 如果 request 超過限制,則速率限制器會阻止所有超出的調用。通常用於防止 server 同時被太多 request 淹沒,來維持 server 健康。例如:防止濫用、減少垃圾郵件。
import timeclassSimpleRateLimiter:def__init__(self): self.allow_request = time.monotonic()defis_allowed(self): current_time = time.monotonic()if self.allow_request <= current_time: self.allow_request = current_time +1.0returnTruereturnFalsedefmy_func():#print("Called!") ...rate_limiter =SimpleRateLimiter()# Simulate 20 requestsfor i inrange(20): time.sleep(0.5)# Simulate processing tasksif rate_limiter.is_allowed():print(f"Execution user is allowed.")my_func()else:print(f"Execution user is blocked.")
執行結果:
Execution user is allowed.
Execution user is blocked.
Execution user is allowed.
Execution user is blocked.
Execution user is allowed.
Execution user is blocked.
...
速率限制器 – 正式版
PYTHON
import timeclassRateLimiter:def__init__(self,max_calls_per_second): self.max_calls_per_second = max_calls_per_second self.last_call =Nonedefcall(self,func,*args,**kwargs): now = time.monotonic()if self.last_call isnotNoneand (now - self.last_call <1/ self.max_calls_per_second):return"Rate limit exceeded"else: self.last_call = nowreturnfunc(*args, **kwargs)rate_limiter =RateLimiter(1)# Allow 1 call per seconddefmy_func():return"Called!"# Call the function within the rate limitprint(rate_limiter.call(my_func))# Outputs: "Called!"print(rate_limiter.call(my_func))# Outputs: "Rate limit exceeded"
Decorators Class 和 Decorators function 之間的選擇,通常可以歸結為個人偏好、用例特殊性或風格。
如果 Decorator 需要在調用被 decorated function 之間維護狀態或提供額外的交互方法,則 decorator Class 可能更合適。
PYTHON
import timedefrate_limited(max_calls_per_second): last_called =0.0defdecorator(func):defwrapper(*args,**kwargs):nonlocal last_called now = time.monotonic()if now - last_called <1/ max_calls_per_second:# Set certain rate return"Rate limit exceeded"else: last_called = nowreturnfunc(*args, **kwargs)return wrapperreturn decorator@rate_limited(1)# Allow 1 call per seconddefmy_func():return"Called!"# Call the function within the rate limitprint(my_func())# Outputs: "Called!"time.sleep(1)print(my_func())# Outputs: "Called!"print(my_func())# Outputs: "Rate limit exceeded"
PYTHON
import timeclassRateLimiter:def__init__(self,max_calls_per_second): self.max_calls_per_second = max_calls_per_second self.last_call =0 self.__count =0def__call__(self,func):defwrapper(*args,**kwargs): now = time.monotonic()if now - self.last_call <1/ self.max_calls_per_second:return"Rate limit exceeded"else: self.__count +=1 self.last_call = nowreturnfunc(*args, **kwargs)return wrapper@propertydefcounter(self):return self.__countlimiter =RateLimiter(1)# Allow 1 call per second@limiterdefmy_func():return"Called!"# Call the function within the rate limitprint(my_func())# Outputs: "Called!"time.sleep(1)print(my_func())# Outputs: "Called!"print(my_func())# Outputs: Rate limit exceededprint(limiter.counter)# Outputs: 2
import time# Start the timersstart_process_time = time.process_time()start_perf_counter = time.perf_counter()# Run a loop for a bitfor _ inrange(1000000):pass# Sleep for a bittime.sleep(2)# Stop the timersend_process_time = time.process_time()end_perf_counter = time.perf_counter()print('process_time:', end_process_time - start_process_time)# Output: 0.0156 sprint('perf_counter:', end_perf_counter - start_perf_counter)# Output: 2.0361 s
thread_time(), thread_time_ns()
返回當前 thread 的系統和使用者 CPU 時間之和的值 (以秒為單位)。 它不包括睡眠期間經過的時間 (sleep())。返回值的參考點是未定義的,因此只有同一 thread 中兩次調用結果之間的差異才有效。
由於全域直譯器鎖 (GIL),在 Python thread 中,運行的 CPU 密集型任務不會 Parallel,而是會 Concurrency。詳見:concurrent.futures【執行緒、程序】。
PYTHON
import timeimport concurrent.futuresdefcpu_intensive_task(thread_num): start_time = time.thread_time()for i inrange(10**7):# A simple CPU intensive taskpass end_time = time.thread_time()returnf"Thread time taken by cpu_intensive_task in Worker-{thread_num}: {end_time - start_time} sec"# Create ThreadPoolExecutorwith concurrent.futures.ThreadPoolExecutor(max_workers=5)as executor:# Use list comprehension to create futures futures = [executor.submit(cpu_intensive_task, i)for i inrange(5)]for future in concurrent.futures.as_completed(futures):print(future.result())
執行結果:
Thread time taken by cpu_intensive_task in Worker-0: 0.09375 sec
Thread time taken by cpu_intensive_task in Worker-2: 0.109375 sec
Thread time taken by cpu_intensive_task in Worker-1: 0.109375 sec
Thread time taken by cpu_intensive_task in Worker-3: 0.109375 sec
Thread time taken by cpu_intensive_task in Worker-4: 0.109375 sec