from queue import Queue
# Create queue
q = Queue(maxsize=5)
# Add elements
for i in range(5):
q.put(i)
print(q.full()) # Output: True
empty() 如果 queue 為空返回 True,否則返回 False。
PYTHON
import queue
# Create a new queue
my_queue = queue.Queue()
# Check queue
if my_queue.empty():
print("The queue is empty.") # ✔️✔️
else:
print("The queue is not empty.")
qsize() 是一個返回 queue 大致大小,通常用於檢查當前 queue 中的項目數量。
import queue
# Create queue
q = queue.Queue()
# Add elements
for i in range(5):
q.put(i)
print(q.qsize()) # Output: 5
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
def producer(q):
for i in range(10):
if not q.full():
print("Producing", i)
q.put(i)
q.put(None) # Sentinel value
def consumer(q):
while True:
item = q.get()
if item is None: # Check for sentinel value
q.task_done()
break
print(f"Consumed {item}")
q.task_done()
q = Queue(maxsize=5)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, q)
executor.submit(consumer, q)
# Block until all tasks are done
q.join()
import queue
import time
from concurrent.futures import ThreadPoolExecutor
class Producer():
def __init__(self, queue, feedback_queue):
self.queue = queue
self.feedback_queue = feedback_queue # feedback
def produce(self):
count = 0
while True:
self.queue.put(count)
count += 1
self.feedback_queue.get() # Wait until feedback
time.sleep(1) # Simulation time-consuming
print(f"Produced {count}")
class Consumer():
def __init__(self, queue, feedback_queue):
self.queue = queue
self.feedback_queue = feedback_queue
def consume(self):
while True:
if self.queue.qsize() > 0:
data = self.queue.get()
self.feedback_queue.put(1) # Send feedback
time.sleep(1) # Simulation time-consuming
print(f"Consumed {data}")
q = queue.Queue()
feedback_q = queue.Queue()
producer = Producer(q, feedback_q)
consumer = Consumer(q, feedback_q)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer.produce)
executor.submit(consumer.consume)
執行結果:
Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Consumed 3
...
PYTHON
import queue
import time
from concurrent.futures import ThreadPoolExecutor
class Producer():
"""
Represents a producer that generates a specified number of items in batches.
Attributes:
queue (queue.Queue): The queue to put items into.
feedback_queue (queue.Queue): A queue used for receiving feedback from the consumer.
batch_size (int): The number of items to produce in each batch.
"""
def __init__(self, queue, feedback_queue, batch_size):
self.queue = queue
self.feedback_queue = feedback_queue
self.batch_size = batch_size
def produce(self):
"""
Continuously produces items in batches of size batch_size and puts them into the queue.
Waits for feedback from the consumer before producing the next batch.
"""
while True:
count = 0
for _ in range(self.batch_size):
self.queue.put(count)
count += 1
self.feedback_queue.get() # Wait until feedback
time.sleep(1) # Simulation time-consuming
print(f"Produced {count - self.batch_size} to {count - 1}")
class Consumer():
"""
Represents a consumer that consumes items from a queue in batches.
Attributes:
queue (queue.Queue): The queue to get items from.
feedback_queue (queue.Queue): A queue used for sending feedback to the producer.
batch_size (int): The number of items to consume in each batch.
"""
def __init__(self, queue, feedback_queue, batch_size):
self.queue = queue
self.feedback_queue = feedback_queue
self.batch_size = batch_size
def consume(self):
"""
Continuously consumes items from the queue in batches of size batch_size.
Sends feedback to the producer after consuming each batch.
"""
while True:
if self.queue.qsize() >= self.batch_size:
batch_data = []
for _ in range(self.batch_size):
batch_data.append(self.queue.get())
self.feedback_queue.put(1) # Send feedback
time.sleep(1) # Simulation time-consuming
print(f"Consumed {batch_data}")
q = queue.Queue()
feedback_q = queue.Queue()
batch_size = 5
producer = Producer(q, feedback_q, batch_size)
consumer = Consumer(q, feedback_q, batch_size)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer.produce)
executor.submit(consumer.consume)
執行結果:
Produced 0 to 4
Consumed [0, 1, 2, 3, 4]
Consumed [0, 1, 2, 3, 4]
Produced 0 to 4
Produced 0 to 4
Consumed [0, 1, 2, 3, 4]
Consumed [0, 1, 2, 3, 4]
Produced 0 to 4
...
import queue
import time
from concurrent.futures import ThreadPoolExecutor
class Producer:
def __init__(self, queue):
self.queue = queue
def produce(self):
count = 0
while True:
self.queue.put(count)
count += 1
time.sleep(0.5)
print(f"Produced {count}")
class Consumer:
def __init__(self, queue):
self.queue = queue
def consume(self):
while True:
if self.queue.qsize() > 0:
data = self.queue.get()
time.sleep(1)
print(f"Consumed {data}")
class Monitor:
def __init__(self, queue):
self.queue = queue
def monitor_queue_size(self):
while True:
size = self.queue.qsize()
print(f"Queue size: {size}")
if size > 5:
print("Warning: Queue size exceeded 5!")
time.sleep(2) # Check queue size
q = queue.Queue()
producer = Producer(q)
consumer = Consumer(q)
monitor = Monitor(q)
with ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(producer.produce)
executor.submit(consumer.consume)
executor.submit(monitor.monitor_queue_size)
執行結果:
Queue size: 0
Produced 1
Consumed 0
Produced 2
Produced 3
Queue size: 2
Produced 4
Consumed 1
Produced 5
Consumed 2
Produced 6
Produced 7
Queue size: 4
Produced 8
Consumed 3
Produced 9
Consumed 4
Produced 10
Produced 11
Queue size: 6
Warning: Queue size exceeded 10!
Consumed 5
Produced 12
Produced 13
...
join(), task_done()
join() 用於阻塞 main thread,直到接收並處理 queue 中的所有項目(即,每個項目調用 task_done()。) 如果不調用 task_done(),q.join() 將無限期地阻塞。
import concurrent.futures
import queue
import time
def producer(queue):
# Create queue and populate it with items
print("Starting producer...")
for i in range(10):
print(f"Producing {i}")
queue.put(i)
time.sleep(0.2)
queue.put(None) # Sentinel value
print(f"[Produced]")
def consumer(queue):
print("Starting consumer...")
while True:
item = queue.get()
if item is None: # Check for sentinel value
queue.task_done()
break
time.sleep(0.2)
queue.task_done()
print(f"Consumed {item}")
q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, q)
executor.submit(consumer, q)
q.join()
print("All tasks have been completed.")
import concurrent.futures
import queue
import time
def worker(q):
while not q.empty():
item = q.get()
time.sleep(0.1)
print(f"Completed item: {item}")
q.task_done()
def main():
# Create queue and populate it with items
q = queue.Queue()
for i in range(20):
q.put(i)
# Create worker threads
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Submit tasks (Each worker)
workers = [executor.submit(worker, q) for _ in range(5)]
# Wait for all the tasks
q.join()
print("All tasks have been processed.")
if __name__ == "__main__":
main()