# full(), empty(), qsize(), join(), task\_done()

## full(), empty(), qsize()

***

`full()` 用於檢查隊列是否已滿。 如果隊列已滿則返回 True，否則返回 False。

Queue 物件必須使用 `maxsize` 參數進行初始化，否則，`full()` 將始終返回 `False`，因為 queue 永遠不會被視為已滿。

{% code title="PYTHON" %}

```python
from queue import Queue

# Create queue
q = Queue(maxsize=5)

# Add elements
for i in range(5):
    q.put(i)

print(q.full())  # Output: True
```

{% endcode %}

`empty()` 如果 queue 為空返回 `True`，否則返回 `False`。

{% code title="PYTHON" %}

```python
import queue

# Create a new queue
my_queue = queue.Queue()

# Check queue
if my_queue.empty():
    print("The queue is empty.")  # ✔️✔️
else:
    print("The queue is not empty.")
```

{% endcode %}

`qsize()` 是一個返回 queue 大致大小，通常用於檢查當前 queue 中的項目數量。

{% code title="" %}

```python
import queue

# Create queue
q = queue.Queue()

# Add elements
for i in range(5):
    q.put(i)

print(q.qsize())  # Output: 5
```

{% endcode %}

{% hint style="warning" %}
**由於 multithreading semantics (多線程語義)，此計數不可靠，並且由於與 parallel processing 相關的固有競爭條件，不能保證準確的計數。**&#x20;

因此不應依賴它們來精確追蹤 queue 大小，而是作為粗略指南。
{% endhint %}

### 範例 – 生產者-消費者問題

***

生產者-消費者問題(Producer-Consumer problem)是一個經典的同步問題，非常適合在multi-threading context 中理解 `qsize()` 方法。

**Deadlock 情況：** `consumer()` 是一個無限迴圈 `while True:`，只要程序在運行，它就會繼續執行。 因此即使 queue 為空，`consumer()` 也會繼續運行，導致 deadlock。

您可以透過使用 `None` 作為標記值 ([Sentinel value](https://en.wikipedia.org/wiki/Sentinel_value))，`producer()` 在完成生產後會將 `None` 放入 queue 中。 `consumer()` 檢查獲取的每個項目，如果該項目為 `None`，則它會跳出迴圈。

{% code title="PYTHON" %}

```python
from queue import Queue
from concurrent.futures import ThreadPoolExecutor

def producer(q):
    for i in range(10):
        if not q.full():
            print("Producing", i)
            q.put(i)
    q.put(None)           # Sentinel value

def consumer(q):
    while True:
        item = q.get()
        if item is None:  # Check for sentinel value
            q.task_done()
            break
        print(f"Consumed {item}")
        q.task_done()

q = Queue(maxsize=5)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer, q)
    executor.submit(consumer, q)

# Block until all tasks are done
q.join()

```

{% endcode %}

**執行結果：**

```txt
Producing 0
Producing 1
Consuming 0
Producing 2
Consuming 1
...
```

### 範例 – Feedback loop

***

如果您希望 Producer 和 Consumer 始終相等，使 Producer 僅在消耗了前一個物品後才生產新項目。 實現此目的的一種方法是使用另一個 queue 在某個項目被消耗時，向 Producer 發出信號。 這有時被稱為 feedback loop (反饋循環)。

{% code title="PYTHON" %}

```python
import queue
import time
from concurrent.futures import ThreadPoolExecutor

class Producer():
    def __init__(self, queue, feedback_queue):
        self.queue = queue
        self.feedback_queue = feedback_queue  # feedback

    def produce(self):
        count = 0
        while True:
            self.queue.put(count)
            count += 1
            self.feedback_queue.get()  # Wait until feedback
            time.sleep(1)         # Simulation time-consuming
            print(f"Produced {count}")
			
class Consumer():
    def __init__(self, queue, feedback_queue):
        self.queue = queue
        self.feedback_queue = feedback_queue

    def consume(self):
        while True:
            if self.queue.qsize() > 0:
                data = self.queue.get()
                self.feedback_queue.put(1)  # Send feedback
                time.sleep(1)     # Simulation time-consuming
                print(f"Consumed {data}")

q = queue.Queue()
feedback_q = queue.Queue()

producer = Producer(q, feedback_q)
consumer = Consumer(q, feedback_q)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer.produce)
    executor.submit(consumer.consume)
```

{% endcode %}

**執行結果：**

```txt
Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Consumed 3
...
```

{% code title="PYTHON" fullWidth="false" %}

```python
import queue
import time
from concurrent.futures import ThreadPoolExecutor

class Producer():
    """
    Represents a producer that generates a specified number of items in batches.

    Attributes:
        queue (queue.Queue): The queue to put items into.
        feedback_queue (queue.Queue): A queue used for receiving feedback from the consumer.
        batch_size (int): The number of items to produce in each batch.
    """

    def __init__(self, queue, feedback_queue, batch_size):
        self.queue = queue
        self.feedback_queue = feedback_queue
        self.batch_size = batch_size

    def produce(self):
        """
        Continuously produces items in batches of size batch_size and puts them into the queue.
        Waits for feedback from the consumer before producing the next batch.
        """ 
        while True:
            count = 0
            for _ in range(self.batch_size):
                self.queue.put(count)
                count += 1
            self.feedback_queue.get()  # Wait until feedback
            time.sleep(1)              # Simulation time-consuming
            print(f"Produced {count - self.batch_size} to {count - 1}")
            
class Consumer():
    """
    Represents a consumer that consumes items from a queue in batches.

    Attributes:
        queue (queue.Queue): The queue to get items from.
        feedback_queue (queue.Queue): A queue used for sending feedback to the producer.
        batch_size (int): The number of items to consume in each batch.
    """

    def __init__(self, queue, feedback_queue, batch_size):
        self.queue = queue
        self.feedback_queue = feedback_queue
        self.batch_size = batch_size

    def consume(self):
        """
        Continuously consumes items from the queue in batches of size batch_size.
        Sends feedback to the producer after consuming each batch.
        """
        while True:
            if self.queue.qsize() >= self.batch_size:
                batch_data = []
                for _ in range(self.batch_size):
                    batch_data.append(self.queue.get())
                self.feedback_queue.put(1)  # Send feedback
                time.sleep(1)  # Simulation time-consuming
                print(f"Consumed {batch_data}")

q = queue.Queue()
feedback_q = queue.Queue()

batch_size = 5
producer = Producer(q, feedback_q, batch_size)
consumer = Consumer(q, feedback_q, batch_size)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer.produce)
    executor.submit(consumer.consume)
```

{% endcode %}

**執行結果：**

```
Produced 0 to 4
Consumed [0, 1, 2, 3, 4]
Consumed [0, 1, 2, 3, 4]
Produced 0 to 4
Produced 0 to 4
Consumed [0, 1, 2, 3, 4]
Consumed [0, 1, 2, 3, 4]
Produced 0 to 4
...
```

### 範例 – 監控 Queue 的大小

***

儘管 `qsize()` 對於精確計數並不總是可靠，但它在您需要以非關鍵的方式監視 queue 大小的情況下很有用。例如：定期 log queue 大小、queue 超過一定大小時觸發 action。

假設您有一個系統，Producer 將任務添加到 queue 中，而 Consumer 將任務從 queue 中刪除。 如果 queue 變得太大，可能表明 Consumer 無法跟上 Producer，這可能是出現問題的徵兆。

{% code title="PYTHON" %}

```python
import queue
import time
from concurrent.futures import ThreadPoolExecutor

class Producer:
    def __init__(self, queue):
        self.queue = queue

    def produce(self):
        count = 0
        while True:
            self.queue.put(count)
            count += 1
            time.sleep(0.5)
            print(f"Produced {count}")

class Consumer:
    def __init__(self, queue):
        self.queue = queue

    def consume(self):
        while True:
            if self.queue.qsize() > 0:
                data = self.queue.get()
                time.sleep(1)
                print(f"Consumed {data}")

class Monitor:
    def __init__(self, queue):
        self.queue = queue

    def monitor_queue_size(self):
        while True:
            size = self.queue.qsize()
            print(f"Queue size: {size}")
            if size > 5:
                print("Warning: Queue size exceeded 5!")
            time.sleep(2)  # Check queue size

q = queue.Queue()

producer = Producer(q)
consumer = Consumer(q)
monitor = Monitor(q)

with ThreadPoolExecutor(max_workers=3) as executor:
    executor.submit(producer.produce)
    executor.submit(consumer.consume)
    executor.submit(monitor.monitor_queue_size)
```

{% endcode %}

**執行結果：**

```
Queue size: 0
Produced 1
Consumed 0
Produced 2
Produced 3
Queue size: 2
Produced 4
Consumed 1
Produced 5
Consumed 2
Produced 6
Produced 7
Queue size: 4
Produced 8
Consumed 3
Produced 9
Consumed 4
Produced 10
Produced 11
Queue size: 6
Warning: Queue size exceeded 10!
Consumed 5
Produced 12
Produced 13
...
```

## join(), task\_done()

***

`join()` 用於阻塞 main thread，直到接收並處理 queue 中的所有項目(即，每個項目調用 `task_done()`。) 如果不調用 `task_done()`，`q.join()` 將無限期地阻塞。

{% code title="PYTHON" %}

```PYTHON
join()
```

{% endcode %}

`task_done()` 指示先前 enqueued 任務已完成(告訴 queue 該項目已被處理)。它經常與 multi-threading 或 multi-processing 結合使用。

{% code title="PYTHON" %}

```PYTHON
task_done()
```

{% endcode %}

當 Producer thread 完成任務時，應該調用 `task_done()`。 這很重要，特別是當您使用 `join()` 時，它會 block thread，直到 queue 中的所有項目都已獲取並處理完畢(意味著已為每個項目調用 `task_done()`)。

{% hint style="info" %}
Queue Class 常用於 threads 間通信和協調，其中一個 thread 將項目添加到 queue (producer)，另一個 thread 從 queue (consumer) 獲取項目。
{% endhint %}

### 範例

***

{% code title="PYTHON" %}

```python
import concurrent.futures
import queue
import time

def producer(queue):
    # Create queue and populate it with items
    print("Starting producer...")
    for i in range(10):
        print(f"Producing {i}")
        queue.put(i)
        time.sleep(0.2)
    queue.put(None)           # Sentinel value
    print(f"[Produced]")

def consumer(queue):
    print("Starting consumer...")
    while True:
        item = queue.get()
        if item is None:      # Check for sentinel value
            queue.task_done()
            break
        time.sleep(0.2)
        queue.task_done()
        print(f"Consumed {item}")

q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer, q)
    executor.submit(consumer, q)
    q.join()
print("All tasks have been completed.")
```

{% endcode %}

**執行結果：**

```txt
Starting producer...
Producing 0
Starting consumer...
Producing 1
Consumed 0
Producing 2
Consumed 1
Consumed 2
Producing 3
Producing 4
...
```

{% code title="PYTHON" %}

```python
import concurrent.futures
import queue
import time

def worker(q):
    while not q.empty():
        item = q.get()
        time.sleep(0.1)
        print(f"Completed item: {item}")
        q.task_done()

def main():
    # Create queue and populate it with items
    q = queue.Queue()
    for i in range(20):
        q.put(i)

    # Create worker threads
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
         # Submit tasks (Each worker)
        workers = [executor.submit(worker, q) for _ in range(5)]
        
        # Wait for all the tasks
        q.join()
        print("All tasks have been processed.")

if __name__ == "__main__":
    main()
```

{% endcode %}

**執行結果：**

```txt
Completed item: 4
Completed item: 2
Completed item: 3
Completed item: 0
Completed item: 1
....
Completed item: 19
Completed item: 18
Completed item: 15
Completed item: 17
Completed item: 16
All tasks have been processed.
```

## 參考資料

***

[queue --- 同步佇列 (queue) class(類別) — Python 3.11.4 說明文件](https://docs.python.org/zh-tw/3/library/queue.html)

[Python - queue.task\_done() 用於什麼？ - 堆棧溢出](https://stackoverflow.com/questions/49637086/python-what-is-queue-task-done-used-for)

[Python daemon thead 解說 | My.APOLLO](https://myapollo.com.tw/zh-tw/python-daemon-thread/)

[\[Python\] Daemon Thread | 羅倫斯的IT航海日誌 - 點部落](https://dotblogs.com.tw/law1009/2011/12/30/63939)

[在 Python 中排隊 task\_done() 和 join()](https://superfastpython.com/thread-queue-task-done-join/)

[Python 中的優先級隊列 - GeeksforGeeks](https://www.geeksforgeeks.org/priority-queue-in-python/)
