Java NIO 中 Selector 解析
一、Selector 简介
1、Selector 和 Channel 关系
Selector
一般称为选择器,可以翻译为 多路复用。它是 Java NIO 核心组件中的一个,用于检查一个或者多个 NIO Channel (通道) 的状态是否处于可读、可写。如此可以实现单线程管理多个 Channels , 也就是可以管理多个网络链接。
使用 Selector 的好处在于:使用更少的线程就可以来处理通道了,相比使用多个线程,避免了线程上下文切换带来的开销。
2、可选择通道(SelectableChannel)
(1)不是所有的 Channel 都是可以被 Selector 复用的。比方说, FileChannel 就不能被选择器复用。判断一个 Channel 能被 Selector 复用,有一个前提:判断他是否继承了一个抽象类 SelectableChannel。如果继承了 SelectableChannel , 则可以被复用,否则不能。
(2)SelectableChannel
提供了实现通道选择性所需要的公共方法。它是所有支持就绪检查通道类的父类,所有 socket 通道,都继承 SelectableChannel 类都是可选择的,包括从管道(Pipe) 对象的中获取得到的通道。而 FileChannel 类,没有继承 SelectableChannel , 因此是不是可选通道。
(3)一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次。通道和选择器之间的关系,使用注册的方式完成。SelectableChannel 可以被注册到 Selector 对象上,在注册时候,需要指定通道的那些操作,是 Selector 感兴趣的。
3、Channel 注册到 Selector
(1)使用Channel.register(Selector sel, int pos)
方法,将一个通道注册到一个选择器时。第一个参数,指定通道要注册的选择器。第二个参数指定选择器需要查询的通道操作。
(2)可供选择器查询的通道操作,从类型类分,包括一下四种:
- 可读:SelectionKey.OP_READ
- 可写:SelectionKey.OP_WRITE
- 连接:SelectionKey.OP_CONNECT
- 接收:SelectionKey.OP_ACCEPT
如果 Selector
对通道的多操作类型感兴趣,可以用“位或”操作符来实现:
比如int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
(3)选择器查询的不是通道的操作,而是通道的某个操作的一种就绪状态。什么操作的就绪状态?一旦通道具备完成某个操作的条件,表示该通道的某个操作已经就绪,就可以被 Selector 查询到,程序可以对通道进行对应的操作。比方说,某个 SocketChannel 通道可以连接到一个服务器,则处于“连接就绪”状态(OP_CONNECT)。 再比方说,一个ServerSocketChannel 服务器通道准备好接收新进入的连接,则处于“接收就绪”(OP_ACCEPT)状态。还比方说,一个数据可读的通道,可以说是“读就绪”(OP_READ)。一个等待写数据的通道可以说是“写就绪”(OP_WRITE)。
4、选择键(SelectionKey)
(1)Channel 注册之后,并且一旦通道处于某种就绪状态,就可以被选择器查询到。这个工作使用选择器 Selector 的 select() 方法完成。select 方法的作用,对感兴趣的通道操作,进行就绪状态的查询。
(2)Selector 可以不断的查询 Channel 中发生的操作的就绪状态。并且选择甘心去的操作就绪状态。一旦通道有操作的就绪状态达成,并且是 Selecor 感兴趣的操作,就会被 Selector 选中,放入选择键集合中。
(3)一个选择键,首先包含了注册在 Selector 的通道操作的类型,比方说: SelectionKey.OP_READ . 也包含了特定的通道与特定的选择器之间的注册关系。
开发应用程序是,选择键是编程的关键,NIO 编程,就是更具对应的选择键,进行不同的业务逻辑处理。
(4)选择键的概念,和事件的概念比较相似。一个选择键类似监听器模式里面的一个事件。由于 Selector 不是事件触发的模式,而是主动去查询的模式,所以不叫事件 Event, 而是叫 SelectionKey 选择键。
二、Selector 的使用方法
1、Selector 的创建
通过 Selector.open() 方法创建一个 Selector 对象。如下;
// 获取 Selector 选择器 Selector selector = Selector.open();
2、注册 Channel 到 Selector
要实现 Selector
管理 Channel
, 需要将 channel 注册到相应的 Selector 上
// 1. 获取 Selector 选择器 Selector selector = Selector.open(); // 2. 获取通道 ServerSocketChannel socketChannel = ServerSocketChannel.open(); // 3. 设置为非阻塞 socketChannel.configureBlocking(false); // 4. 绑定连接 socketChannel.bind(new InetSocketAddress(9999)); // 5. 将通道注册到选择器 socketChannel.register(selector, SelectionKey.OP_ACCEPT);
上面通过调用通道的 register() 方法会将它注册到一个选择器上。
需要注意的是:
(1)与 Selector 一起使用, channel 必须处于非阻塞模式下,否则将抛出异常 IllegalBlockingModeException 。 这意味着,FileChannel 不能与 Selector 一起使用,因为 FileChannel 不能切换到非阻塞模式,而套接字相关的所有通道都可以。
(2)一个通道,并没有一定要持有所有的四种操作。比如服务器通道 ServerSocketChannel 支持 Accept 接收操作,而 SocketChannel 客户端通道则不支持。可以通过通道上的 vildOps() 方法,来获取特定通道下所支持的操作集合。
3、轮训查询就绪操作
(1) 通过 Selector 的 select() 方法, 可以查询出已经就绪的通道操作,有些就绪的状态集合,包含在一个元素是 Selectionkey 对象的 Set 集合中
(2) 下面是 Selector 几个重载的查询 select() 方法:
select()
阻塞到至少有一个通道在你注册的事件上就绪。select(long timeout)
和select()
一样,但最长阻塞事件为 timeout 毫秒。selectNow()
非阻塞,只要有通道就立即返回。select()
方法返回的 int 之,表示有多少通道已经就绪,准确的说目前一次 select
方法来到这一次 select 方法之间的时间段上,有多少个通道编程了就绪状态。
例如:首次调用 select() 方法,如果有一个通道编程了就绪状态,返回了 1 , 若子啊次调用 select() 方法,如果另外一个通道就绪了,它会再次返回 1。 如果第一个就绪的 chnanel 么有做任何操作,现在就有两个就绪通道,但是每次 select() 方法调用之间,只有一个通道就绪了。
一旦调用 select() 方法,并且返回值部位 0 时,在 Selector 中有一个 seletedKeys() 方法,用来范围已选择键集合,迭代集合的每个以元素,根据就绪操作的类型,完成对应的操作
// 查询已经就绪的通道操作 Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); // 判断 key 就绪状态操作 if (key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } } iterator.remove();
4、停止选择的方法
选择器执行选择的过程汇总,系统底层会依次询问每个通道是否已经就绪,这个过程可能会造成调用线程进入阻塞状态,那么我们有一下三种方式可以唤醒在 select()
方法中阻塞的线程。
wakeup() 方法:通过调用 Selector 对象的 wakeup() 方法让处于阻塞状态的 select() 方法立刻返回
该方法使得选择器上的第一个哈没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次会对 select() 方法的一次调用立即返回。
close() 方法: 通过 close() 方法关闭 selector
该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似 wakeup()) , 同时使的注册到该 Selector 的所有 Channel 被注销,所有的键都被取消,但是 Channel 本身不会关闭。
三、示例代码
1、服务端代码
@Test public void server() throws IOException { //1. 获取服务端通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //2. 切换非阻塞模式 serverSocketChannel.configureBlocking(false); //3. 创建 buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); ByteBuffer writeBuffer = ByteBuffer.allocate(1024); writeBuffer.put("收到了。。。。".getBytes(StandardCharsets.UTF_8)); //4. 绑定端口号 serverSocketChannel.bind(new InetSocketAddress(20000)); //5. 获取 selector 选择器 Selector selector = Selector.open(); //6. 通道注册到选择器,进行监听 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //7. 选择器进行轮训,进行后续操作 while (selector.select() > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator(); // 循环 while (selectionKeyIterator.hasNext()) { // 获取就绪状态 SelectionKey k = selectionKeyIterator.next(); // 操作判断 if (k.isAcceptable()) { // 获取连接 SocketChannel accept = serverSocketChannel.accept(); // 切换非阻塞模式 accept.configureBlocking(false); // 注册 accept.register(selector, SelectionKey.OP_READ); } else if (k.isReadable()) { SocketChannel socketChannel = (SocketChannel) k.channel(); readBuffer.clear(); socketChannel.read(readBuffer); readBuffer.flip(); System.out.println("received:" + new String(readBuffer.array(), StandardCharsets.UTF_8)); k.interestOps(SelectionKey.OP_WRITE); } else if (k.isWritable()) { writeBuffer.rewind(); SocketChannel socketChannel = (SocketChannel) k.channel(); socketChannel.write(writeBuffer); k.interestOps(SelectionKey.OP_READ); } } } }
2、客户端代码
@Test public void client() throws IOException { //1. 获取通道,绑定主机和端口号 SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(20000)); //2. 切换到非阻塞模式 socketChannel.configureBlocking(false); //3. 创建 buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //4. 写入 buffer 数据 buffer.put(new Date().toString().getBytes(StandardCharsets.UTF_8)); //5. 模式切换 buffer.flip(); //6. 写入通道 socketChannel.write(buffer); //7. 关闭 buffer.clear(); socketChannel.close(); }
3、NIO 编程步骤总结
- 1、创建一个 ServerSocketChannel 通道
- 2、设置为非阻塞模式
- 3、创建一个 Selector 选择器
- 4、Channel 注册到选择器中,监听连接事件
- 5、调用 Selector 中的 select 方法(循环调用),监听通道是否是就绪状态
- 6、调用 SelectKeys() 方法就能获取 就绪 channel 集合
- 7、遍历就绪的 channel 集合,判断就绪事件类型,实现具体的业务操作。
- 8、根据业务流程,判断是否需要再次注册事件监听事件,重复执行。
上一篇:Spring data jpa @Query update的坑及解决
栏 目:JAVA代码
下一篇:SpringBoot常见get/post请求参数处理、参数注解校验及参数自定义注解校验详解
本文地址:http://www.codeinn.net/misctech/206383.html