from queue import Queue# Create queueq =Queue(maxsize=5)# Add elementsfor i inrange(5): q.put(i)print(q.full())# Output: True
empty() 如果 queue 為空返回 True,否則返回 False。
PYTHON
import queue# Create a new queuemy_queue = queue.Queue()# Check queueif my_queue.empty():print("The queue is empty.")# ✔️✔️else:print("The queue is not empty.")
Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Consumed 3
...
PYTHON
import queueimport timefrom concurrent.futures import ThreadPoolExecutorclassProducer():""" Represents a producer that generates a specified number of items in batches. Attributes: queue (queue.Queue): The queue to put items into. feedback_queue (queue.Queue): A queue used for receiving feedback from the consumer. batch_size (int): The number of items to produce in each batch. """def__init__(self,queue,feedback_queue,batch_size): self.queue = queue self.feedback_queue = feedback_queue self.batch_size = batch_sizedefproduce(self):""" Continuously produces items in batches of size batch_size and puts them into the queue. Waits for feedback from the consumer before producing the next batch. """whileTrue: count =0for _ inrange(self.batch_size): self.queue.put(count) count +=1 self.feedback_queue.get()# Wait until feedback time.sleep(1)# Simulation time-consumingprint(f"Produced {count - self.batch_size} to {count -1}")classConsumer():""" Represents a consumer that consumes items from a queue in batches. Attributes: queue (queue.Queue): The queue to get items from. feedback_queue (queue.Queue): A queue used for sending feedback to the producer. batch_size (int): The number of items to consume in each batch. """def__init__(self,queue,feedback_queue,batch_size): self.queue = queue self.feedback_queue = feedback_queue self.batch_size = batch_sizedefconsume(self):""" Continuously consumes items from the queue in batches of size batch_size. Sends feedback to the producer after consuming each batch. """whileTrue:if self.queue.qsize()>= self.batch_size: batch_data = []for _ inrange(self.batch_size): batch_data.append(self.queue.get()) self.feedback_queue.put(1)# Send feedback time.sleep(1)# Simulation time-consumingprint(f"Consumed {batch_data}")q = queue.Queue()feedback_q = queue.Queue()batch_size =5producer =Producer(q, feedback_q, batch_size)consumer =Consumer(q, feedback_q, batch_size)withThreadPoolExecutor(max_workers=2)as executor: executor.submit(producer.produce) executor.submit(consumer.consume)
執行結果:
Produced 0 to 4
Consumed [0, 1, 2, 3, 4]
Consumed [0, 1, 2, 3, 4]
Produced 0 to 4
Produced 0 to 4
Consumed [0, 1, 2, 3, 4]
Consumed [0, 1, 2, 3, 4]
Produced 0 to 4
...
Queue size: 0
Produced 1
Consumed 0
Produced 2
Produced 3
Queue size: 2
Produced 4
Consumed 1
Produced 5
Consumed 2
Produced 6
Produced 7
Queue size: 4
Produced 8
Consumed 3
Produced 9
Consumed 4
Produced 10
Produced 11
Queue size: 6
Warning: Queue size exceeded 10!
Consumed 5
Produced 12
Produced 13
...
join(), task_done()
join() 用於阻塞 main thread,直到接收並處理 queue 中的所有項目(即,每個項目調用 task_done()。) 如果不調用 task_done(),q.join() 將無限期地阻塞。