full(), empty(), qsize()
full()
用於檢查隊列是否已滿。 如果隊列已滿則返回 True,否則返回 False。
Queue 物件必須使用 maxsize
參數進行初始化,否則,full()
將始終返回 False
,因為 queue 永遠不會被視為已滿。
from queue import Queue
# Create queue
q = Queue(maxsize=5)
# Add elements
for i in range(5):
q.put(i)
print(q.full()) # Output: True
empty()
如果 queue 為空返回 True
,否則返回 False
。
import queue
# Create a new queue
my_queue = queue.Queue()
# Check queue
if my_queue.empty():
print("The queue is empty.") # ✔️✔️
else:
print("The queue is not empty.")
qsize()
是一個返回 queue 大致大小,通常用於檢查當前 queue 中的項目數量。
import queue
# Create queue
q = queue.Queue()
# Add elements
for i in range(5):
q.put(i)
print(q.qsize()) # Output: 5
由於 multithreading semantics (多線程語義),此計數不可靠,並且由於與 parallel processing 相關的固有競爭條件,不能保證準確的計數。
因此不應依賴它們來精確追蹤 queue 大小,而是作為粗略指南。
範例 – 生產者-消費者問題
生產者-消費者問題(Producer-Consumer problem)是一個經典的同步問題,非常適合在multi-threading context 中理解 qsize()
方法。
Deadlock 情況: consumer()
是一個無限迴圈 while True:
,只要程序在運行,它就會繼續執行。 因此即使 queue 為空,consumer()
也會繼續運行,導致 deadlock。
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
def producer(q):
for i in range(10):
if not q.full():
print("Producing", i)
q.put(i)
q.put(None) # Sentinel value
def consumer(q):
while True:
item = q.get()
if item is None: # Check for sentinel value
q.task_done()
break
print(f"Consumed {item}")
q.task_done()
q = Queue(maxsize=5)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, q)
executor.submit(consumer, q)
# Block until all tasks are done
q.join()
執行結果:
Producing 0
Producing 1
Consuming 0
Producing 2
Consuming 1
...
範例 – Feedback loop
如果您希望 Producer 和 Consumer 始終相等,使 Producer 僅在消耗了前一個物品後才生產新項目。 實現此目的的一種方法是使用另一個 queue 在某個項目被消耗時,向 Producer 發出信號。 這有時被稱為 feedback loop (反饋循環)。
import queue
import time
from concurrent.futures import ThreadPoolExecutor
class Producer():
def __init__(self, queue, feedback_queue):
self.queue = queue
self.feedback_queue = feedback_queue # feedback
def produce(self):
count = 0
while True:
self.queue.put(count)
count += 1
self.feedback_queue.get() # Wait until feedback
time.sleep(1) # Simulation time-consuming
print(f"Produced {count}")
class Consumer():
def __init__(self, queue, feedback_queue):
self.queue = queue
self.feedback_queue = feedback_queue
def consume(self):
while True:
if self.queue.qsize() > 0:
data = self.queue.get()
self.feedback_queue.put(1) # Send feedback
time.sleep(1) # Simulation time-consuming
print(f"Consumed {data}")
q = queue.Queue()
feedback_q = queue.Queue()
producer = Producer(q, feedback_q)
consumer = Consumer(q, feedback_q)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer.produce)
executor.submit(consumer.consume)
執行結果:
Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Consumed 3
...
import queue
import time
from concurrent.futures import ThreadPoolExecutor
class Producer():
"""
Represents a producer that generates a specified number of items in batches.
Attributes:
queue (queue.Queue): The queue to put items into.
feedback_queue (queue.Queue): A queue used for receiving feedback from the consumer.
batch_size (int): The number of items to produce in each batch.
"""
def __init__(self, queue, feedback_queue, batch_size):
self.queue = queue
self.feedback_queue = feedback_queue
self.batch_size = batch_size
def produce(self):
"""
Continuously produces items in batches of size batch_size and puts them into the queue.
Waits for feedback from the consumer before producing the next batch.
"""
while True:
count = 0
for _ in range(self.batch_size):
self.queue.put(count)
count += 1
self.feedback_queue.get() # Wait until feedback
time.sleep(1) # Simulation time-consuming
print(f"Produced {count - self.batch_size} to {count - 1}")
class Consumer():
"""
Represents a consumer that consumes items from a queue in batches.
Attributes:
queue (queue.Queue): The queue to get items from.
feedback_queue (queue.Queue): A queue used for sending feedback to the producer.
batch_size (int): The number of items to consume in each batch.
"""
def __init__(self, queue, feedback_queue, batch_size):
self.queue = queue
self.feedback_queue = feedback_queue
self.batch_size = batch_size
def consume(self):
"""
Continuously consumes items from the queue in batches of size batch_size.
Sends feedback to the producer after consuming each batch.
"""
while True:
if self.queue.qsize() >= self.batch_size:
batch_data = []
for _ in range(self.batch_size):
batch_data.append(self.queue.get())
self.feedback_queue.put(1) # Send feedback
time.sleep(1) # Simulation time-consuming
print(f"Consumed {batch_data}")
q = queue.Queue()
feedback_q = queue.Queue()
batch_size = 5
producer = Producer(q, feedback_q, batch_size)
consumer = Consumer(q, feedback_q, batch_size)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer.produce)
executor.submit(consumer.consume)
執行結果:
Produced 0 to 4
Consumed [0, 1, 2, 3, 4]
Consumed [0, 1, 2, 3, 4]
Produced 0 to 4
Produced 0 to 4
Consumed [0, 1, 2, 3, 4]
Consumed [0, 1, 2, 3, 4]
Produced 0 to 4
...
範例 – 監控 Queue 的大小
儘管 qsize()
對於精確計數並不總是可靠,但它在您需要以非關鍵的方式監視 queue 大小的情況下很有用。例如:定期 log queue 大小、queue 超過一定大小時觸發 action。
假設您有一個系統,Producer 將任務添加到 queue 中,而 Consumer 將任務從 queue 中刪除。 如果 queue 變得太大,可能表明 Consumer 無法跟上 Producer,這可能是出現問題的徵兆。
import queue
import time
from concurrent.futures import ThreadPoolExecutor
class Producer:
def __init__(self, queue):
self.queue = queue
def produce(self):
count = 0
while True:
self.queue.put(count)
count += 1
time.sleep(0.5)
print(f"Produced {count}")
class Consumer:
def __init__(self, queue):
self.queue = queue
def consume(self):
while True:
if self.queue.qsize() > 0:
data = self.queue.get()
time.sleep(1)
print(f"Consumed {data}")
class Monitor:
def __init__(self, queue):
self.queue = queue
def monitor_queue_size(self):
while True:
size = self.queue.qsize()
print(f"Queue size: {size}")
if size > 5:
print("Warning: Queue size exceeded 5!")
time.sleep(2) # Check queue size
q = queue.Queue()
producer = Producer(q)
consumer = Consumer(q)
monitor = Monitor(q)
with ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(producer.produce)
executor.submit(consumer.consume)
executor.submit(monitor.monitor_queue_size)
執行結果:
Queue size: 0
Produced 1
Consumed 0
Produced 2
Produced 3
Queue size: 2
Produced 4
Consumed 1
Produced 5
Consumed 2
Produced 6
Produced 7
Queue size: 4
Produced 8
Consumed 3
Produced 9
Consumed 4
Produced 10
Produced 11
Queue size: 6
Warning: Queue size exceeded 10!
Consumed 5
Produced 12
Produced 13
...
join(), task_done()
join()
用於阻塞 main thread,直到接收並處理 queue 中的所有項目(即,每個項目調用 task_done()
。) 如果不調用 task_done()
,q.join()
將無限期地阻塞。
join()
task_done()
指示先前 enqueued 任務已完成(告訴 queue 該項目已被處理)。它經常與 multi-threading 或 multi-processing 結合使用。
task_done()
當 Producer thread 完成任務時,應該調用 task_done()
。 這很重要,特別是當您使用 join()
時,它會 block thread,直到 queue 中的所有項目都已獲取並處理完畢(意味著已為每個項目調用 task_done()
)。
Queue Class 常用於 threads 間通信和協調,其中一個 thread 將項目添加到 queue (producer),另一個 thread 從 queue (consumer) 獲取項目。
範例
import concurrent.futures
import queue
import time
def producer(queue):
# Create queue and populate it with items
print("Starting producer...")
for i in range(10):
print(f"Producing {i}")
queue.put(i)
time.sleep(0.2)
queue.put(None) # Sentinel value
print(f"[Produced]")
def consumer(queue):
print("Starting consumer...")
while True:
item = queue.get()
if item is None: # Check for sentinel value
queue.task_done()
break
time.sleep(0.2)
queue.task_done()
print(f"Consumed {item}")
q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, q)
executor.submit(consumer, q)
q.join()
print("All tasks have been completed.")
執行結果:
Starting producer...
Producing 0
Starting consumer...
Producing 1
Consumed 0
Producing 2
Consumed 1
Consumed 2
Producing 3
Producing 4
...
import concurrent.futures
import queue
import time
def worker(q):
while not q.empty():
item = q.get()
time.sleep(0.1)
print(f"Completed item: {item}")
q.task_done()
def main():
# Create queue and populate it with items
q = queue.Queue()
for i in range(20):
q.put(i)
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Submit tasks (Each worker)
workers = [executor.submit(worker, q) for _ in range(5)]
# Wait for all the tasks
q.join()
print("All tasks have been processed.")
if __name__ == "__main__":
main()
執行結果:
Completed item: 4
Completed item: 2
Completed item: 3
Completed item: 0
Completed item: 1
....
Completed item: 19
Completed item: 18
Completed item: 15
Completed item: 17
Completed item: 16
All tasks have been processed.
參考資料
Last updated
Was this helpful?