Page cover

full(), empty(), qsize()


full() 用於檢查隊列是否已滿。 如果隊列已滿則返回 True,否則返回 False。

Queue 物件必須使用 maxsize 參數進行初始化,否則,full() 將始終返回 False,因為 queue 永遠不會被視為已滿。

PYTHON
from queue import Queue

# Create queue
q = Queue(maxsize=5)

# Add elements
for i in range(5):
    q.put(i)

print(q.full())  # Output: True

empty() 如果 queue 為空返回 True,否則返回 False

PYTHON
import queue

# Create a new queue
my_queue = queue.Queue()

# Check queue
if my_queue.empty():
    print("The queue is empty.")  # ✔️✔️
else:
    print("The queue is not empty.")

qsize() 是一個返回 queue 大致大小,通常用於檢查當前 queue 中的項目數量。

import queue

# Create queue
q = queue.Queue()

# Add elements
for i in range(5):
    q.put(i)

print(q.qsize())  # Output: 5

範例 – 生產者-消費者問題


生產者-消費者問題(Producer-Consumer problem)是一個經典的同步問題,非常適合在multi-threading context 中理解 qsize() 方法。

Deadlock 情況: consumer() 是一個無限迴圈 while True:,只要程序在運行,它就會繼續執行。 因此即使 queue 為空,consumer() 也會繼續運行,導致 deadlock。

您可以透過使用 None 作為標記值 (Sentinel value),producer() 在完成生產後會將 None 放入 queue 中。 consumer() 檢查獲取的每個項目,如果該項目為 None,則它會跳出迴圈。

PYTHON
from queue import Queue
from concurrent.futures import ThreadPoolExecutor

def producer(q):
    for i in range(10):
        if not q.full():
            print("Producing", i)
            q.put(i)
    q.put(None)           # Sentinel value

def consumer(q):
    while True:
        item = q.get()
        if item is None:  # Check for sentinel value
            q.task_done()
            break
        print(f"Consumed {item}")
        q.task_done()

q = Queue(maxsize=5)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer, q)
    executor.submit(consumer, q)

# Block until all tasks are done
q.join()

執行結果:

Producing 0
Producing 1
Consuming 0
Producing 2
Consuming 1
...

範例 – Feedback loop


如果您希望 Producer 和 Consumer 始終相等,使 Producer 僅在消耗了前一個物品後才生產新項目。 實現此目的的一種方法是使用另一個 queue 在某個項目被消耗時,向 Producer 發出信號。 這有時被稱為 feedback loop (反饋循環)。

PYTHON
import queue
import time
from concurrent.futures import ThreadPoolExecutor

class Producer():
    def __init__(self, queue, feedback_queue):
        self.queue = queue
        self.feedback_queue = feedback_queue  # feedback

    def produce(self):
        count = 0
        while True:
            self.queue.put(count)
            count += 1
            self.feedback_queue.get()  # Wait until feedback
            time.sleep(1)         # Simulation time-consuming
            print(f"Produced {count}")
			
class Consumer():
    def __init__(self, queue, feedback_queue):
        self.queue = queue
        self.feedback_queue = feedback_queue

    def consume(self):
        while True:
            if self.queue.qsize() > 0:
                data = self.queue.get()
                self.feedback_queue.put(1)  # Send feedback
                time.sleep(1)     # Simulation time-consuming
                print(f"Consumed {data}")

q = queue.Queue()
feedback_q = queue.Queue()

producer = Producer(q, feedback_q)
consumer = Consumer(q, feedback_q)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer.produce)
    executor.submit(consumer.consume)

執行結果:

Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Consumed 3
...

PYTHON
import queue
import time
from concurrent.futures import ThreadPoolExecutor

class Producer():
    """
    Represents a producer that generates a specified number of items in batches.

    Attributes:
        queue (queue.Queue): The queue to put items into.
        feedback_queue (queue.Queue): A queue used for receiving feedback from the consumer.
        batch_size (int): The number of items to produce in each batch.
    """

    def __init__(self, queue, feedback_queue, batch_size):
        self.queue = queue
        self.feedback_queue = feedback_queue
        self.batch_size = batch_size

    def produce(self):
        """
        Continuously produces items in batches of size batch_size and puts them into the queue.
        Waits for feedback from the consumer before producing the next batch.
        """ 
        while True:
            count = 0
            for _ in range(self.batch_size):
                self.queue.put(count)
                count += 1
            self.feedback_queue.get()  # Wait until feedback
            time.sleep(1)              # Simulation time-consuming
            print(f"Produced {count - self.batch_size} to {count - 1}")
            
class Consumer():
    """
    Represents a consumer that consumes items from a queue in batches.

    Attributes:
        queue (queue.Queue): The queue to get items from.
        feedback_queue (queue.Queue): A queue used for sending feedback to the producer.
        batch_size (int): The number of items to consume in each batch.
    """

    def __init__(self, queue, feedback_queue, batch_size):
        self.queue = queue
        self.feedback_queue = feedback_queue
        self.batch_size = batch_size

    def consume(self):
        """
        Continuously consumes items from the queue in batches of size batch_size.
        Sends feedback to the producer after consuming each batch.
        """
        while True:
            if self.queue.qsize() >= self.batch_size:
                batch_data = []
                for _ in range(self.batch_size):
                    batch_data.append(self.queue.get())
                self.feedback_queue.put(1)  # Send feedback
                time.sleep(1)  # Simulation time-consuming
                print(f"Consumed {batch_data}")

q = queue.Queue()
feedback_q = queue.Queue()

batch_size = 5
producer = Producer(q, feedback_q, batch_size)
consumer = Consumer(q, feedback_q, batch_size)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer.produce)
    executor.submit(consumer.consume)

執行結果:

Produced 0 to 4
Consumed [0, 1, 2, 3, 4]
Consumed [0, 1, 2, 3, 4]
Produced 0 to 4
Produced 0 to 4
Consumed [0, 1, 2, 3, 4]
Consumed [0, 1, 2, 3, 4]
Produced 0 to 4
...

範例 – 監控 Queue 的大小


儘管 qsize() 對於精確計數並不總是可靠,但它在您需要以非關鍵的方式監視 queue 大小的情況下很有用。例如:定期 log queue 大小、queue 超過一定大小時觸發 action。

假設您有一個系統,Producer 將任務添加到 queue 中,而 Consumer 將任務從 queue 中刪除。 如果 queue 變得太大,可能表明 Consumer 無法跟上 Producer,這可能是出現問題的徵兆。

PYTHON
import queue
import time
from concurrent.futures import ThreadPoolExecutor

class Producer:
    def __init__(self, queue):
        self.queue = queue

    def produce(self):
        count = 0
        while True:
            self.queue.put(count)
            count += 1
            time.sleep(0.5)
            print(f"Produced {count}")

class Consumer:
    def __init__(self, queue):
        self.queue = queue

    def consume(self):
        while True:
            if self.queue.qsize() > 0:
                data = self.queue.get()
                time.sleep(1)
                print(f"Consumed {data}")

class Monitor:
    def __init__(self, queue):
        self.queue = queue

    def monitor_queue_size(self):
        while True:
            size = self.queue.qsize()
            print(f"Queue size: {size}")
            if size > 5:
                print("Warning: Queue size exceeded 5!")
            time.sleep(2)  # Check queue size

q = queue.Queue()

producer = Producer(q)
consumer = Consumer(q)
monitor = Monitor(q)

with ThreadPoolExecutor(max_workers=3) as executor:
    executor.submit(producer.produce)
    executor.submit(consumer.consume)
    executor.submit(monitor.monitor_queue_size)

執行結果:

Queue size: 0
Produced 1
Consumed 0
Produced 2
Produced 3
Queue size: 2
Produced 4
Consumed 1
Produced 5
Consumed 2
Produced 6
Produced 7
Queue size: 4
Produced 8
Consumed 3
Produced 9
Consumed 4
Produced 10
Produced 11
Queue size: 6
Warning: Queue size exceeded 10!
Consumed 5
Produced 12
Produced 13
...

join(), task_done()


join() 用於阻塞 main thread,直到接收並處理 queue 中的所有項目(即,每個項目調用 task_done()。) 如果不調用 task_done()q.join() 將無限期地阻塞。

PYTHON
join()

task_done() 指示先前 enqueued 任務已完成(告訴 queue 該項目已被處理)。它經常與 multi-threading 或 multi-processing 結合使用。

PYTHON
task_done()

當 Producer thread 完成任務時,應該調用 task_done()。 這很重要,特別是當您使用 join() 時,它會 block thread,直到 queue 中的所有項目都已獲取並處理完畢(意味著已為每個項目調用 task_done())。

Queue Class 常用於 threads 間通信和協調,其中一個 thread 將項目添加到 queue (producer),另一個 thread 從 queue (consumer) 獲取項目。

範例


PYTHON
import concurrent.futures
import queue
import time

def producer(queue):
    # Create queue and populate it with items
    print("Starting producer...")
    for i in range(10):
        print(f"Producing {i}")
        queue.put(i)
        time.sleep(0.2)
    queue.put(None)           # Sentinel value
    print(f"[Produced]")

def consumer(queue):
    print("Starting consumer...")
    while True:
        item = queue.get()
        if item is None:      # Check for sentinel value
            queue.task_done()
            break
        time.sleep(0.2)
        queue.task_done()
        print(f"Consumed {item}")

q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer, q)
    executor.submit(consumer, q)
    q.join()
print("All tasks have been completed.")

執行結果:

Starting producer...
Producing 0
Starting consumer...
Producing 1
Consumed 0
Producing 2
Consumed 1
Consumed 2
Producing 3
Producing 4
...

PYTHON
import concurrent.futures
import queue
import time

def worker(q):
    while not q.empty():
        item = q.get()
        time.sleep(0.1)
        print(f"Completed item: {item}")
        q.task_done()

def main():
    # Create queue and populate it with items
    q = queue.Queue()
    for i in range(20):
        q.put(i)

    # Create worker threads
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
         # Submit tasks (Each worker)
        workers = [executor.submit(worker, q) for _ in range(5)]
        
        # Wait for all the tasks
        q.join()
        print("All tasks have been processed.")

if __name__ == "__main__":
    main()

執行結果:

Completed item: 4
Completed item: 2
Completed item: 3
Completed item: 0
Completed item: 1
....
Completed item: 19
Completed item: 18
Completed item: 15
Completed item: 17
Completed item: 16
All tasks have been processed.

參考資料


queue --- 同步佇列 (queue) class(類別) — Python 3.11.4 說明文件

Python - queue.task_done() 用於什麼? - 堆棧溢出

Python daemon thead 解說 | My.APOLLO

[Python] Daemon Thread | 羅倫斯的IT航海日誌 - 點部落

在 Python 中排隊 task_done() 和 join()

Python 中的優先級隊列 - GeeksforGeeks

Last updated

Was this helpful?