XiWind 西風之劍
HomeTechProContactGitHub
  • About
  • Git
    • Windows Terminal、PowerShell 安裝
    • Git 開始使用
    • Branch 入門
    • 合併多個 Commit , 編輯
    • 額外功能
  • deep learning
    • Dilated Convolution
  • Python
    • GIL 【全域直譯器鎖】
    • PyPy 【JIT 編譯器】
    • Decorator 【修飾器】
      • Class Decorators
  • Python library
    • abc 【抽象 Class】
      • ABC, ABCMeta
      • __abstractmethods__, get_cache_token, update_abstractmethods
    • dataclasses 【數據 Class】
      • make_dataclass(), replace(), is_dataclass(), __post_init__
    • enum 【列舉 Class】
      • Flag, auto(), unique, verify()
      • 範例
    • concurrent.futures 【執行緒、程序】
      • Future, Module Functions
    • queue 【佇列、同步】
      • full(), empty(), qsize(), join(), task_done()
    • functools 【可調用物件】
      • ordering、wrapper、partial
      • Overloading
    • heapq 【堆積佇列】
      • heapify(), merge(), nlargest(), nsmallest()
    • time 【時間】
      • time(), monotonic(), perf_counter()...
      • sleep(), 範例...
    • logging 【日誌】
Powered by GitBook
On this page
  • full(), empty(), qsize()
  • 範例 – 生產者-消費者問題
  • 範例 – Feedback loop
  • 範例 – 監控 Queue 的大小
  • join(), task_done()
  • 範例
  • 參考資料

Was this helpful?

  1. Python library
  2. queue 【佇列、同步】

full(), empty(), qsize()


full() 用於檢查隊列是否已滿。 如果隊列已滿則返回 True,否則返回 False。

Queue 物件必須使用 maxsize 參數進行初始化,否則,full() 將始終返回 False,因為 queue 永遠不會被視為已滿。

PYTHON
from queue import Queue

# Create queue
q = Queue(maxsize=5)

# Add elements
for i in range(5):
    q.put(i)

print(q.full())  # Output: True

empty() 如果 queue 為空返回 True,否則返回 False。

PYTHON
import queue

# Create a new queue
my_queue = queue.Queue()

# Check queue
if my_queue.empty():
    print("The queue is empty.")  # ✔️✔️
else:
    print("The queue is not empty.")

qsize() 是一個返回 queue 大致大小,通常用於檢查當前 queue 中的項目數量。

import queue

# Create queue
q = queue.Queue()

# Add elements
for i in range(5):
    q.put(i)

print(q.qsize())  # Output: 5

由於 multithreading semantics (多線程語義),此計數不可靠,並且由於與 parallel processing 相關的固有競爭條件,不能保證準確的計數。

因此不應依賴它們來精確追蹤 queue 大小,而是作為粗略指南。

範例 – 生產者-消費者問題


生產者-消費者問題(Producer-Consumer problem)是一個經典的同步問題,非常適合在multi-threading context 中理解 qsize() 方法。

Deadlock 情況: consumer() 是一個無限迴圈 while True:,只要程序在運行,它就會繼續執行。 因此即使 queue 為空,consumer() 也會繼續運行,導致 deadlock。

PYTHON
from queue import Queue
from concurrent.futures import ThreadPoolExecutor

def producer(q):
    for i in range(10):
        if not q.full():
            print("Producing", i)
            q.put(i)
    q.put(None)           # Sentinel value

def consumer(q):
    while True:
        item = q.get()
        if item is None:  # Check for sentinel value
            q.task_done()
            break
        print(f"Consumed {item}")
        q.task_done()

q = Queue(maxsize=5)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer, q)
    executor.submit(consumer, q)

# Block until all tasks are done
q.join()

執行結果:

Producing 0
Producing 1
Consuming 0
Producing 2
Consuming 1
...

範例 – Feedback loop


如果您希望 Producer 和 Consumer 始終相等,使 Producer 僅在消耗了前一個物品後才生產新項目。 實現此目的的一種方法是使用另一個 queue 在某個項目被消耗時,向 Producer 發出信號。 這有時被稱為 feedback loop (反饋循環)。

PYTHON
import queue
import time
from concurrent.futures import ThreadPoolExecutor

class Producer():
    def __init__(self, queue, feedback_queue):
        self.queue = queue
        self.feedback_queue = feedback_queue  # feedback

    def produce(self):
        count = 0
        while True:
            self.queue.put(count)
            count += 1
            self.feedback_queue.get()  # Wait until feedback
            time.sleep(1)         # Simulation time-consuming
            print(f"Produced {count}")
			
class Consumer():
    def __init__(self, queue, feedback_queue):
        self.queue = queue
        self.feedback_queue = feedback_queue

    def consume(self):
        while True:
            if self.queue.qsize() > 0:
                data = self.queue.get()
                self.feedback_queue.put(1)  # Send feedback
                time.sleep(1)     # Simulation time-consuming
                print(f"Consumed {data}")

q = queue.Queue()
feedback_q = queue.Queue()

producer = Producer(q, feedback_q)
consumer = Consumer(q, feedback_q)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer.produce)
    executor.submit(consumer.consume)

執行結果:

Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Consumed 3
...

PYTHON
import queue
import time
from concurrent.futures import ThreadPoolExecutor

class Producer():
    """
    Represents a producer that generates a specified number of items in batches.

    Attributes:
        queue (queue.Queue): The queue to put items into.
        feedback_queue (queue.Queue): A queue used for receiving feedback from the consumer.
        batch_size (int): The number of items to produce in each batch.
    """

    def __init__(self, queue, feedback_queue, batch_size):
        self.queue = queue
        self.feedback_queue = feedback_queue
        self.batch_size = batch_size

    def produce(self):
        """
        Continuously produces items in batches of size batch_size and puts them into the queue.
        Waits for feedback from the consumer before producing the next batch.
        """ 
        while True:
            count = 0
            for _ in range(self.batch_size):
                self.queue.put(count)
                count += 1
            self.feedback_queue.get()  # Wait until feedback
            time.sleep(1)              # Simulation time-consuming
            print(f"Produced {count - self.batch_size} to {count - 1}")
            
class Consumer():
    """
    Represents a consumer that consumes items from a queue in batches.

    Attributes:
        queue (queue.Queue): The queue to get items from.
        feedback_queue (queue.Queue): A queue used for sending feedback to the producer.
        batch_size (int): The number of items to consume in each batch.
    """

    def __init__(self, queue, feedback_queue, batch_size):
        self.queue = queue
        self.feedback_queue = feedback_queue
        self.batch_size = batch_size

    def consume(self):
        """
        Continuously consumes items from the queue in batches of size batch_size.
        Sends feedback to the producer after consuming each batch.
        """
        while True:
            if self.queue.qsize() >= self.batch_size:
                batch_data = []
                for _ in range(self.batch_size):
                    batch_data.append(self.queue.get())
                self.feedback_queue.put(1)  # Send feedback
                time.sleep(1)  # Simulation time-consuming
                print(f"Consumed {batch_data}")

q = queue.Queue()
feedback_q = queue.Queue()

batch_size = 5
producer = Producer(q, feedback_q, batch_size)
consumer = Consumer(q, feedback_q, batch_size)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer.produce)
    executor.submit(consumer.consume)

執行結果:

Produced 0 to 4
Consumed [0, 1, 2, 3, 4]
Consumed [0, 1, 2, 3, 4]
Produced 0 to 4
Produced 0 to 4
Consumed [0, 1, 2, 3, 4]
Consumed [0, 1, 2, 3, 4]
Produced 0 to 4
...

範例 – 監控 Queue 的大小


儘管 qsize() 對於精確計數並不總是可靠,但它在您需要以非關鍵的方式監視 queue 大小的情況下很有用。例如:定期 log queue 大小、queue 超過一定大小時觸發 action。

假設您有一個系統,Producer 將任務添加到 queue 中,而 Consumer 將任務從 queue 中刪除。 如果 queue 變得太大,可能表明 Consumer 無法跟上 Producer,這可能是出現問題的徵兆。

PYTHON
import queue
import time
from concurrent.futures import ThreadPoolExecutor

class Producer:
    def __init__(self, queue):
        self.queue = queue

    def produce(self):
        count = 0
        while True:
            self.queue.put(count)
            count += 1
            time.sleep(0.5)
            print(f"Produced {count}")

class Consumer:
    def __init__(self, queue):
        self.queue = queue

    def consume(self):
        while True:
            if self.queue.qsize() > 0:
                data = self.queue.get()
                time.sleep(1)
                print(f"Consumed {data}")

class Monitor:
    def __init__(self, queue):
        self.queue = queue

    def monitor_queue_size(self):
        while True:
            size = self.queue.qsize()
            print(f"Queue size: {size}")
            if size > 5:
                print("Warning: Queue size exceeded 5!")
            time.sleep(2)  # Check queue size

q = queue.Queue()

producer = Producer(q)
consumer = Consumer(q)
monitor = Monitor(q)

with ThreadPoolExecutor(max_workers=3) as executor:
    executor.submit(producer.produce)
    executor.submit(consumer.consume)
    executor.submit(monitor.monitor_queue_size)

執行結果:

Queue size: 0
Produced 1
Consumed 0
Produced 2
Produced 3
Queue size: 2
Produced 4
Consumed 1
Produced 5
Consumed 2
Produced 6
Produced 7
Queue size: 4
Produced 8
Consumed 3
Produced 9
Consumed 4
Produced 10
Produced 11
Queue size: 6
Warning: Queue size exceeded 10!
Consumed 5
Produced 12
Produced 13
...

join(), task_done()


join() 用於阻塞 main thread,直到接收並處理 queue 中的所有項目(即,每個項目調用 task_done()。) 如果不調用 task_done(),q.join() 將無限期地阻塞。

PYTHON
join()

task_done() 指示先前 enqueued 任務已完成(告訴 queue 該項目已被處理)。它經常與 multi-threading 或 multi-processing 結合使用。

PYTHON
task_done()

當 Producer thread 完成任務時,應該調用 task_done()。 這很重要,特別是當您使用 join() 時,它會 block thread,直到 queue 中的所有項目都已獲取並處理完畢(意味著已為每個項目調用 task_done())。

Queue Class 常用於 threads 間通信和協調,其中一個 thread 將項目添加到 queue (producer),另一個 thread 從 queue (consumer) 獲取項目。

範例


PYTHON
import concurrent.futures
import queue
import time

def producer(queue):
    # Create queue and populate it with items
    print("Starting producer...")
    for i in range(10):
        print(f"Producing {i}")
        queue.put(i)
        time.sleep(0.2)
    queue.put(None)           # Sentinel value
    print(f"[Produced]")

def consumer(queue):
    print("Starting consumer...")
    while True:
        item = queue.get()
        if item is None:      # Check for sentinel value
            queue.task_done()
            break
        time.sleep(0.2)
        queue.task_done()
        print(f"Consumed {item}")

q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer, q)
    executor.submit(consumer, q)
    q.join()
print("All tasks have been completed.")

執行結果:

Starting producer...
Producing 0
Starting consumer...
Producing 1
Consumed 0
Producing 2
Consumed 1
Consumed 2
Producing 3
Producing 4
...

PYTHON
import concurrent.futures
import queue
import time

def worker(q):
    while not q.empty():
        item = q.get()
        time.sleep(0.1)
        print(f"Completed item: {item}")
        q.task_done()

def main():
    # Create queue and populate it with items
    q = queue.Queue()
    for i in range(20):
        q.put(i)

    # Create worker threads
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
         # Submit tasks (Each worker)
        workers = [executor.submit(worker, q) for _ in range(5)]
        
        # Wait for all the tasks
        q.join()
        print("All tasks have been processed.")

if __name__ == "__main__":
    main()

執行結果:

Completed item: 4
Completed item: 2
Completed item: 3
Completed item: 0
Completed item: 1
....
Completed item: 19
Completed item: 18
Completed item: 15
Completed item: 17
Completed item: 16
All tasks have been processed.

參考資料


Previousqueue 【佇列、同步】Nextfunctools 【可調用物件】

Last updated 1 year ago

Was this helpful?

您可以透過使用 None 作為標記值 (),producer() 在完成生產後會將 None 放入 queue 中。 consumer() 檢查獲取的每個項目,如果該項目為 None,則它會跳出迴圈。

Sentinel value
queue --- 同步佇列 (queue) class(類別) — Python 3.11.4 說明文件
Python - queue.task_done() 用於什麼? - 堆棧溢出
Python daemon thead 解說 | My.APOLLO
[Python] Daemon Thread | 羅倫斯的IT航海日誌 - 點部落
在 Python 中排隊 task_done() 和 join()
Python 中的優先級隊列 - GeeksforGeeks
Page cover image