时间:2022-11-20 10:13:45 | 栏目:JAVA代码 | 点击:次
问题:
1.什么是阻塞队列?如何使用阻塞队列来实现生产者-消费者模型?
2. 生产者消费者模型的作用是什么?
在生产者-消费者模式中,通常有两类线程,即生产者线程(若干个)和消费者线程(若干个)。生产者线程向消息队列加入数据,消费者线程则从消息队列消耗数据。生产者和消费者、消息队列之间的关系结构图如图:
(1) 消息队列可以用来平衡生产和消费的线程资源;
(2) 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据 ;
(3) 消息队列是有容量限制的,消息队列满后,生产者不能再加入数据;消息队列空时,消费者不能再取出数据;
(4) 消息队列是线程安全的,在并发操作消息队列的过程中,不能出现数据不一致的情况;或者在多个线程并发更改共享数据后,不会造成出现脏数据的情况;
(5) JDK 中各种阻塞队列,采用的就是这种模式;
1、消息队列中存放的消息类:
/** * 消息队列中存放的消息类 */ final public class Message { private int id; private int value; public Message(int id,int value){ this.id = id; this.value = value; } public int getId() { return id; } public int getValue() { return value; } }
2、实现阻塞队列(消息队列) :
import lombok.extern.slf4j.Slf4j; import java.util.LinkedList; /** * 实现一个阻塞队列(消息队列),实现java线程间通信 */ @Slf4j public class MessageQueue { // 消息队列的容量 private int capacity; // 消息队列 LinkedList<Message> messageQueue = new LinkedList<>(); // 设置消息队列的容量 public MessageQueue(int capacity){ this.capacity = capacity; } // 从消息队列中取消息 public Message take(){ synchronized (messageQueue){ // 如果消息队列为空 while (messageQueue.isEmpty()){ try { log.debug("队列为空, 消费者线程等待"); messageQueue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = messageQueue.removeFirst(); log.debug("已消费消息 {}", message); // 走到这,说明消息队列不为null messageQueue.notifyAll(); return message; } } // 往消息队列中放消息 public void put(Message message){ synchronized (messageQueue){ // 如果消息队列已满 while (messageQueue.size()==capacity){ try { log.debug("队列已满, 生产者线程等待"); messageQueue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } messageQueue.addLast(message); log.debug("已生产消息 {}", message); // 走到这,说明消息队列不满 messageQueue.notifyAll(); } } }
3、测试:
public class Main { public static void main(String[] args) { MessageQueue queue = new MessageQueue(2); for(int i=0;i<3;i++){ int id = i; new Thread(()->{ queue.put(new Message(id,id)); },"生产者").start(); } new Thread(()->{ while (true){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Message message = queue.take(); } },"消费者").start(); } }
执行结果:
15:31:28.488 [生产者] DEBUG com.example.test.MessageQueue - 已生产消息 com.example.test.Message@54309a75
15:31:28.507 [生产者] DEBUG com.example.test.MessageQueue - 已生产消息 com.example.test.Message@50915389
15:31:28.507 [生产者] DEBUG com.example.test.MessageQueue - 队列已满, 生产者线程等待
15:31:29.486 [消费者] DEBUG com.example.test.MessageQueue - 已消费消息 com.example.test.Message@54309a75
15:31:29.486 [生产者] DEBUG com.example.test.MessageQueue - 已生产消息 com.example.test.Message@6340ac12
15:31:30.487 [消费者] DEBUG com.example.test.MessageQueue - 已消费消息 com.example.test.Message@50915389
15:31:31.487 [消费者] DEBUG com.example.test.MessageQueue - 已消费消息 com.example.test.Message@6340ac12
15:31:32.488 [消费者] DEBUG com.example.test.MessageQueue - 队列为空, 消费者线程等待
(1) 通过平衡生产者的生产能力和消费者的消费能力来提升整个系统的运行效率 ;
(2) 解耦,解耦意味着生产者和消费者之间的联系少,联系越少越可以独自发展而不需要收到相互的制约;