当前位置:主页 > 软件编程 > JAVA代码 >

Java NIO 中 Selector 解析

时间:2022-06-30 09:27:59 | 栏目:JAVA代码 | 点击:

一、Selector 简介

1、Selector 和 Channel 关系

Selector 一般称为选择器,可以翻译为 多路复用。它是 Java NIO 核心组件中的一个,用于检查一个或者多个 NIO Channel (通道) 的状态是否处于可读、可写。如此可以实现单线程管理多个 Channels , 也就是可以管理多个网络链接。

使用 Selector 的好处在于:使用更少的线程就可以来处理通道了,相比使用多个线程,避免了线程上下文切换带来的开销。

2、可选择通道(SelectableChannel)

(1)不是所有的 Channel 都是可以被 Selector 复用的。比方说, FileChannel 就不能被选择器复用。判断一个 Channel 能被 Selector 复用,有一个前提:判断他是否继承了一个抽象类 SelectableChannel。如果继承了 SelectableChannel , 则可以被复用,否则不能。

(2)SelectableChannel 提供了实现通道选择性所需要的公共方法。它是所有支持就绪检查通道类的父类,所有 socket 通道,都继承 SelectableChannel 类都是可选择的,包括从管道(Pipe) 对象的中获取得到的通道。而 FileChannel 类,没有继承 SelectableChannel , 因此是不是可选通道。

(3)一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次。通道和选择器之间的关系,使用注册的方式完成。SelectableChannel 可以被注册到 Selector 对象上,在注册时候,需要指定通道的那些操作,是 Selector 感兴趣的。

3、Channel 注册到 Selector

(1)使用Channel.register(Selector sel, int pos) 方法,将一个通道注册到一个选择器时。第一个参数,指定通道要注册的选择器。第二个参数指定选择器需要查询的通道操作。

(2)可供选择器查询的通道操作,从类型类分,包括一下四种:

如果 Selector 对通道的多操作类型感兴趣,可以用“位或”操作符来实现:

比如int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

(3)选择器查询的不是通道的操作,而是通道的某个操作的一种就绪状态。什么操作的就绪状态?一旦通道具备完成某个操作的条件,表示该通道的某个操作已经就绪,就可以被 Selector 查询到,程序可以对通道进行对应的操作。比方说,某个 SocketChannel 通道可以连接到一个服务器,则处于“连接就绪”状态(OP_CONNECT)。 再比方说,一个ServerSocketChannel 服务器通道准备好接收新进入的连接,则处于“接收就绪”(OP_ACCEPT)状态。还比方说,一个数据可读的通道,可以说是“读就绪”(OP_READ)。一个等待写数据的通道可以说是“写就绪”(OP_WRITE)。

4、选择键(SelectionKey)

(1)Channel 注册之后,并且一旦通道处于某种就绪状态,就可以被选择器查询到。这个工作使用选择器 Selector 的 select() 方法完成。select 方法的作用,对感兴趣的通道操作,进行就绪状态的查询。

(2)Selector 可以不断的查询 Channel 中发生的操作的就绪状态。并且选择甘心去的操作就绪状态。一旦通道有操作的就绪状态达成,并且是 Selecor 感兴趣的操作,就会被 Selector 选中,放入选择键集合中。

(3)一个选择键,首先包含了注册在 Selector 的通道操作的类型,比方说: SelectionKey.OP_READ . 也包含了特定的通道与特定的选择器之间的注册关系。

开发应用程序是,选择键是编程的关键,NIO 编程,就是更具对应的选择键,进行不同的业务逻辑处理。

(4)选择键的概念,和事件的概念比较相似。一个选择键类似监听器模式里面的一个事件。由于 Selector 不是事件触发的模式,而是主动去查询的模式,所以不叫事件 Event, 而是叫 SelectionKey 选择键。

二、Selector 的使用方法

1、Selector 的创建

通过 Selector.open() 方法创建一个 Selector 对象。如下;

// 获取 Selector 选择器
Selector selector = Selector.open();

2、注册 Channel 到 Selector

要实现 Selector 管理 Channel , 需要将 channel 注册到相应的 Selector 上

// 1. 获取 Selector 选择器
Selector selector = Selector.open();

// 2. 获取通道
ServerSocketChannel socketChannel = ServerSocketChannel.open();

// 3. 设置为非阻塞
socketChannel.configureBlocking(false);

// 4. 绑定连接
socketChannel.bind(new InetSocketAddress(9999));

// 5. 将通道注册到选择器
socketChannel.register(selector, SelectionKey.OP_ACCEPT);

上面通过调用通道的 register() 方法会将它注册到一个选择器上。

需要注意的是:

(1)与 Selector 一起使用, channel 必须处于非阻塞模式下,否则将抛出异常 IllegalBlockingModeException 。 这意味着,FileChannel 不能与 Selector 一起使用,因为 FileChannel 不能切换到非阻塞模式,而套接字相关的所有通道都可以。

(2)一个通道,并没有一定要持有所有的四种操作。比如服务器通道 ServerSocketChannel 支持 Accept 接收操作,而 SocketChannel 客户端通道则不支持。可以通过通道上的 vildOps() 方法,来获取特定通道下所支持的操作集合。

3、轮训查询就绪操作

(1) 通过 Selector 的 select() 方法, 可以查询出已经就绪的通道操作,有些就绪的状态集合,包含在一个元素是 Selectionkey 对象的 Set 集合中

(2) 下面是 Selector 几个重载的查询 select() 方法:

  1. select() 阻塞到至少有一个通道在你注册的事件上就绪。
  2. select(long timeout) select() 一样,但最长阻塞事件为 timeout 毫秒。
  3. selectNow() 非阻塞,只要有通道就立即返回。
  4. select() 方法返回的 int 之,表示有多少通道已经就绪,准确的说目前一次 select

方法来到这一次 select 方法之间的时间段上,有多少个通道编程了就绪状态。

例如:首次调用 select() 方法,如果有一个通道编程了就绪状态,返回了 1 , 若子啊次调用 select() 方法,如果另外一个通道就绪了,它会再次返回 1。 如果第一个就绪的 chnanel 么有做任何操作,现在就有两个就绪通道,但是每次 select() 方法调用之间,只有一个通道就绪了。

一旦调用 select() 方法,并且返回值部位 0 时,在 Selector 中有一个 seletedKeys() 方法,用来范围已选择键集合,迭代集合的每个以元素,根据就绪操作的类型,完成对应的操作

// 查询已经就绪的通道操作
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
    SelectionKey key = iterator.next();

    // 判断 key 就绪状态操作
    if (key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
}
iterator.remove();

4、停止选择的方法

选择器执行选择的过程汇总,系统底层会依次询问每个通道是否已经就绪,这个过程可能会造成调用线程进入阻塞状态,那么我们有一下三种方式可以唤醒在 select()方法中阻塞的线程。

wakeup() 方法:通过调用 Selector 对象的 wakeup() 方法让处于阻塞状态的 select() 方法立刻返回

该方法使得选择器上的第一个哈没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次会对 select() 方法的一次调用立即返回。

close() 方法: 通过 close() 方法关闭 selector

该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似 wakeup()) , 同时使的注册到该 Selector 的所有 Channel 被注销,所有的键都被取消,但是 Channel 本身不会关闭。

三、示例代码

1、服务端代码

@Test
public void server() throws IOException {
    //1. 获取服务端通道
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

    //2. 切换非阻塞模式
    serverSocketChannel.configureBlocking(false);

    //3. 创建 buffer
    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
    writeBuffer.put("收到了。。。。".getBytes(StandardCharsets.UTF_8));

    //4. 绑定端口号
    serverSocketChannel.bind(new InetSocketAddress(20000));

    //5. 获取 selector 选择器
    Selector selector = Selector.open();

    //6. 通道注册到选择器,进行监听
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    //7. 选择器进行轮训,进行后续操作
    while (selector.select() > 0) {
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();
        // 循环
        while (selectionKeyIterator.hasNext()) {
            // 获取就绪状态
            SelectionKey k = selectionKeyIterator.next();

            // 操作判断
            if (k.isAcceptable()) {
                // 获取连接
                SocketChannel accept = serverSocketChannel.accept();

                // 切换非阻塞模式
                accept.configureBlocking(false);

                // 注册
                accept.register(selector, SelectionKey.OP_READ);
            } else if (k.isReadable()) {
                SocketChannel socketChannel = (SocketChannel) k.channel();
                readBuffer.clear();
                socketChannel.read(readBuffer);

                readBuffer.flip();
                System.out.println("received:" + new String(readBuffer.array(), StandardCharsets.UTF_8));
                k.interestOps(SelectionKey.OP_WRITE);
            } else if (k.isWritable()) {
                writeBuffer.rewind();

                SocketChannel socketChannel = (SocketChannel) k.channel();
                socketChannel.write(writeBuffer);
                k.interestOps(SelectionKey.OP_READ);
            }
        }
    }
}

2、客户端代码

@Test
public void client() throws IOException {

    //1. 获取通道,绑定主机和端口号
    SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(20000));

    //2. 切换到非阻塞模式
    socketChannel.configureBlocking(false);

    //3. 创建 buffer
    ByteBuffer buffer = ByteBuffer.allocate(1024);

    //4. 写入 buffer 数据
    buffer.put(new Date().toString().getBytes(StandardCharsets.UTF_8));

    //5. 模式切换
    buffer.flip();

    //6. 写入通道
    socketChannel.write(buffer);

    //7. 关闭
    buffer.clear();
    socketChannel.close();
}

3、NIO 编程步骤总结

您可能感兴趣的文章:

相关文章